Skip to content

Commit

Permalink
Cache: Implement resource dict for submit() (#451)
Browse files Browse the repository at this point in the history
* Cache: Implement resource dict for submit()

* Reduce inputs

* bug fixes
  • Loading branch information
jan-janssen authored Oct 27, 2024
1 parent f661177 commit df5e582
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 12 deletions.
4 changes: 2 additions & 2 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ class FileExecutor(ExecutorBase):
def __init__(
self,
cache_directory: str = "cache",
execute_function: callable = execute_in_subprocess,
cores_per_worker: int = 1,
cwd: Optional[str] = None,
execute_function: callable = execute_in_subprocess,
terminate_function: Optional[callable] = None,
):
"""
Expand All @@ -26,8 +26,8 @@ def __init__(
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
cwd (str/None): current working directory where the parallel python task is executed
terminate_function (callable, optional): The function to terminate the tasks.
cwd (str/None): current working directory where the parallel python task is executed
"""
super().__init__()
if execute_function == execute_in_subprocess and terminate_function is None:
Expand Down
13 changes: 9 additions & 4 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def done(self) -> bool:
def execute_tasks_h5(
future_queue: queue.Queue,
cache_directory: str,
cores_per_worker: int,
execute_function: callable,
cwd: Optional[str],
cores_per_worker: int = 1,
cwd: Optional[str] = None,
terminate_function: Optional[callable] = None,
) -> None:
"""
Expand Down Expand Up @@ -93,11 +93,16 @@ def execute_tasks_h5(
memory_dict=memory_dict,
file_name_dict=file_name_dict,
)
resource_dict = task_dict["resource_dict"].copy()
if "cores" not in resource_dict:
resource_dict["cores"] = cores_per_worker
if "cwd" not in resource_dict:
resource_dict["cwd"] = cwd
task_key, data_dict = serialize_funct_h5(
fn=task_dict["fn"],
fn_args=task_args,
fn_kwargs=task_kwargs,
resource_dict=task_dict["resource_dict"],
resource_dict=resource_dict,
)
if task_key not in memory_dict.keys():
if task_key + ".h5out" not in os.listdir(cache_directory):
Expand All @@ -111,7 +116,7 @@ def execute_tasks_h5(
task_dependent_lst=[
process_dict[k] for k in future_wait_key_lst
],
cwd=cwd,
resource_dict=resource_dict,
)
file_name_dict[task_key] = os.path.join(
cache_directory, task_key + ".h5out"
Expand Down
13 changes: 10 additions & 3 deletions executorlib/standalone/cache/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
cwd: Optional[str] = None,
resource_dict: Optional[dict] = None,
) -> subprocess.Popen:
"""
Execute a command in a subprocess.
Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
cwd (str/None): current working directory where the parallel python task is executed
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Example resource dictionary: {
cwd: None,
}
Returns:
subprocess.Popen: The subprocess object.
Expand All @@ -24,7 +27,11 @@ def execute_in_subprocess(
task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
]
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
if resource_dict is None:
resource_dict = {"cwd": None}
elif len(resource_dict) == 0:
resource_dict = {"cwd": None}
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"])


def terminate_subprocess(task):
Expand Down
3 changes: 0 additions & 3 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def test_executor_function(self):
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -115,7 +114,6 @@ def test_executor_function_dependence_kwargs(self):
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -157,7 +155,6 @@ def test_executor_function_dependence_args(self):
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
"terminate_function": terminate_subprocess,
},
)
Expand Down

0 comments on commit df5e582

Please sign in to comment.