From df5e58234a008cf17fbcd5893ab73fbaa2920707 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 27 Oct 2024 21:52:32 +0100 Subject: [PATCH] Cache: Implement resource dict for submit() (#451) * Cache: Implement resource dict for submit() * Reduce inputs * bug fixes --- executorlib/cache/executor.py | 4 ++-- executorlib/cache/shared.py | 13 +++++++++---- executorlib/standalone/cache/spawner.py | 13 ++++++++++--- tests/test_cache_executor_serial.py | 3 --- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index b9800103..f9919680 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -14,9 +14,9 @@ class FileExecutor(ExecutorBase): def __init__( self, cache_directory: str = "cache", - execute_function: callable = execute_in_subprocess, cores_per_worker: int = 1, cwd: Optional[str] = None, + execute_function: callable = execute_in_subprocess, terminate_function: Optional[callable] = None, ): """ @@ -26,8 +26,8 @@ def __init__( cache_directory (str, optional): The directory to store cache files. Defaults to "cache". execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1. - cwd (str/None): current working directory where the parallel python task is executed terminate_function (callable, optional): The function to terminate the tasks. + cwd (str/None): current working directory where the parallel python task is executed """ super().__init__() if execute_function == execute_in_subprocess and terminate_function is None: diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 5bb8764e..2f8ba947 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -49,9 +49,9 @@ def done(self) -> bool: def execute_tasks_h5( future_queue: queue.Queue, cache_directory: str, - cores_per_worker: int, execute_function: callable, - cwd: Optional[str], + cores_per_worker: int = 1, + cwd: Optional[str] = None, terminate_function: Optional[callable] = None, ) -> None: """ @@ -93,11 +93,16 @@ def execute_tasks_h5( memory_dict=memory_dict, file_name_dict=file_name_dict, ) + resource_dict = task_dict["resource_dict"].copy() + if "cores" not in resource_dict: + resource_dict["cores"] = cores_per_worker + if "cwd" not in resource_dict: + resource_dict["cwd"] = cwd task_key, data_dict = serialize_funct_h5( fn=task_dict["fn"], fn_args=task_args, fn_kwargs=task_kwargs, - resource_dict=task_dict["resource_dict"], + resource_dict=resource_dict, ) if task_key not in memory_dict.keys(): if task_key + ".h5out" not in os.listdir(cache_directory): @@ -111,7 +116,7 @@ def execute_tasks_h5( task_dependent_lst=[ process_dict[k] for k in future_wait_key_lst ], - cwd=cwd, + resource_dict=resource_dict, ) file_name_dict[task_key] = os.path.join( cache_directory, task_key + ".h5out" diff --git a/executorlib/standalone/cache/spawner.py b/executorlib/standalone/cache/spawner.py index aa6a39dc..b4da5ae3 100644 --- a/executorlib/standalone/cache/spawner.py +++ b/executorlib/standalone/cache/spawner.py @@ -6,7 +6,7 @@ def execute_in_subprocess( command: list, task_dependent_lst: list = [], - cwd: Optional[str] = None, + resource_dict: Optional[dict] = None, ) -> subprocess.Popen: """ Execute a command in a subprocess. @@ -14,7 +14,10 @@ def execute_in_subprocess( Args: command (list): The command to be executed. task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. - cwd (str/None): current working directory where the parallel python task is executed + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. + Example resource dictionary: { + cwd: None, + } Returns: subprocess.Popen: The subprocess object. @@ -24,7 +27,11 @@ def execute_in_subprocess( task_dependent_lst = [ task for task in task_dependent_lst if task.poll() is None ] - return subprocess.Popen(command, universal_newlines=True, cwd=cwd) + if resource_dict is None: + resource_dict = {"cwd": None} + elif len(resource_dict) == 0: + resource_dict = {"cwd": None} + return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"]) def terminate_subprocess(task): diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 7745f572..491167dc 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -73,7 +73,6 @@ def test_executor_function(self): "cache_directory": cache_dir, "execute_function": execute_in_subprocess, "cores_per_worker": 1, - "cwd": None, "terminate_function": terminate_subprocess, }, ) @@ -115,7 +114,6 @@ def test_executor_function_dependence_kwargs(self): "cache_directory": cache_dir, "execute_function": execute_in_subprocess, "cores_per_worker": 1, - "cwd": None, "terminate_function": terminate_subprocess, }, ) @@ -157,7 +155,6 @@ def test_executor_function_dependence_args(self): "cache_directory": cache_dir, "execute_function": execute_in_subprocess, "cores_per_worker": 1, - "cwd": None, "terminate_function": terminate_subprocess, }, )