Skip to content

Move cache into the resource dict #496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class Executor:
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores_per_worker (int): number of MPI cores to be used for each function call
Expand Down Expand Up @@ -85,7 +84,6 @@ def __init__(
self,
max_workers: Optional[int] = None,
backend: str = "local",
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
Expand All @@ -106,7 +104,6 @@ def __new__(
cls,
max_workers: Optional[int] = None,
backend: str = "local",
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
Expand All @@ -133,7 +130,6 @@ def __new__(
number of cores which can be used in parallel - just like the max_cores parameter. Using
max_cores is recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
Expand Down Expand Up @@ -186,7 +182,6 @@ def __new__(
max_workers=max_workers,
backend=backend,
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
Expand All @@ -202,7 +197,6 @@ def __new__(
return ExecutorWithDependencies(
max_workers=max_workers,
backend=backend,
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
flux_executor=flux_executor,
Expand All @@ -221,7 +215,6 @@ def __new__(
return create_executor(
max_workers=max_workers,
backend=backend,
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
flux_executor=flux_executor,
Expand Down
9 changes: 2 additions & 7 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
class FileExecutor(ExecutorBase):
def __init__(
self,
cache_directory: str = "cache",
resource_dict: Optional[dict] = None,
execute_function: callable = execute_with_pysqa,
terminate_function: Optional[callable] = None,
Expand All @@ -38,7 +37,6 @@ def __init__(
Initialize the FileExecutor.

Args:
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
Expand All @@ -52,6 +50,7 @@ def __init__(
default_resource_dict = {
"cores": 1,
"cwd": None,
"cache": "cache",
}
if resource_dict is None:
resource_dict = {}
Expand All @@ -60,7 +59,7 @@ def __init__(
)
if execute_function == execute_in_subprocess and terminate_function is None:
terminate_function = terminate_subprocess
cache_directory_path = os.path.abspath(cache_directory)
cache_directory_path = os.path.abspath(resource_dict.pop("cache"))
os.makedirs(cache_directory_path, exist_ok=True)
self._set_process(
RaisingThread(
Expand All @@ -83,7 +82,6 @@ def create_file_executor(
max_workers: int = 1,
backend: str = "pysqa_flux",
max_cores: int = 1,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
Expand All @@ -94,8 +92,6 @@ def create_file_executor(
init_function: Optional[callable] = None,
disable_dependencies: bool = False,
):
if cache_directory is None:
cache_directory = "executorlib_cache"
if block_allocation:
raise ValueError(
"The option block_allocation is not available with the pysqa based backend."
Expand All @@ -110,7 +106,6 @@ def create_file_executor(
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
Expand Down
3 changes: 0 additions & 3 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def create_executor(
max_workers: Optional[int] = None,
backend: str = "local",
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
Expand All @@ -173,7 +172,6 @@ def create_executor(
recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
max_cores (int): defines the number cores which can be used in parallel
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
Expand Down Expand Up @@ -202,7 +200,6 @@ def create_executor(
backend = "flux"
check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
cores_per_worker = resource_dict["cores"]
resource_dict["cache_directory"] = cache_directory
resource_dict["hostname_localhost"] = hostname_localhost
if backend == "flux":
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
Expand Down
7 changes: 3 additions & 4 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def execute_parallel_tasks(
spawner: BaseSpawner = MpiExecSpawner,
hostname_localhost: Optional[bool] = None,
init_function: Optional[Callable] = None,
cache_directory: Optional[str] = None,
**kwargs,
) -> None:
"""
Expand All @@ -221,7 +220,6 @@ def execute_parallel_tasks(
this look up for security reasons. So on MacOS it is required to set this
option to true
init_function (callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
"""
interface = interface_bootup(
command_lst=_get_backend_path(
Expand All @@ -242,7 +240,8 @@ def execute_parallel_tasks(
future_queue.join()
break
elif "fn" in task_dict.keys() and "future" in task_dict.keys():
if cache_directory is None:
resource_dict = task_dict.get("resource_dict", {})
if "cache" in resource_dict:
_execute_task(
interface=interface, task_dict=task_dict, future_queue=future_queue
)
Expand All @@ -251,7 +250,7 @@ def execute_parallel_tasks(
interface=interface,
task_dict=task_dict,
future_queue=future_queue,
cache_directory=cache_directory,
cache_directory=resource_dict["cache"],
)


Expand Down
3 changes: 1 addition & 2 deletions tests/test_cache_executor_pysqa_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ class TestCacheExecutorPysqa(unittest.TestCase):
def test_executor(self):
with Executor(
backend="pysqa_flux",
resource_dict={"cores": 2, "cwd": "cache"},
resource_dict={"cores": 2, "cwd": "cache", "cache": "cache"},
block_allocation=False,
cache_directory="cache",
) as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(mpi_funct, 1)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_executor_backend_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ def tearDown(self):
def test_meta_executor_parallel_cache(self):
with Executor(
max_workers=2,
resource_dict={"cores": 2},
resource_dict={"cores": 2, "cache": "./cache"},
backend="local",
block_allocation=True,
cache_directory="./cache",
) as exe:
cloudpickle_register(ind=1)
time_1 = time.time()
Expand Down
Loading