-
Notifications
You must be signed in to change notification settings - Fork 4
Refactor classes - highlight standalone modules #444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a73260a
1eaafd2
5c68cf6
abd5ffe
daf1df8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,148 @@ | ||||||||||||||
import queue | ||||||||||||||
from concurrent.futures import ( | ||||||||||||||
Executor as FutureExecutor, | ||||||||||||||
) | ||||||||||||||
from concurrent.futures import ( | ||||||||||||||
Future, | ||||||||||||||
) | ||||||||||||||
from typing import Optional | ||||||||||||||
|
||||||||||||||
from executorlib.standalone.inputcheck import ( | ||||||||||||||
check_resource_dict, | ||||||||||||||
check_resource_dict_is_empty, | ||||||||||||||
) | ||||||||||||||
from executorlib.standalone.queue import cancel_items_in_queue | ||||||||||||||
from executorlib.standalone.serialize import cloudpickle_register | ||||||||||||||
from executorlib.standalone.thread import RaisingThread | ||||||||||||||
|
||||||||||||||
|
||||||||||||||
class ExecutorBase(FutureExecutor): | ||||||||||||||
""" | ||||||||||||||
Base class for the executor. | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
FutureExecutor: Base class for the executor. | ||||||||||||||
""" | ||||||||||||||
|
||||||||||||||
def __init__(self): | ||||||||||||||
""" | ||||||||||||||
Initialize the ExecutorBase class. | ||||||||||||||
""" | ||||||||||||||
cloudpickle_register(ind=3) | ||||||||||||||
self._future_queue: queue.Queue = queue.Queue() | ||||||||||||||
self._process: Optional[RaisingThread] = None | ||||||||||||||
|
||||||||||||||
@property | ||||||||||||||
def info(self) -> Optional[dict]: | ||||||||||||||
""" | ||||||||||||||
Get the information about the executor. | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
Optional[dict]: Information about the executor. | ||||||||||||||
""" | ||||||||||||||
if self._process is not None and isinstance(self._process, list): | ||||||||||||||
meta_data_dict = self._process[0]._kwargs.copy() | ||||||||||||||
if "future_queue" in meta_data_dict.keys(): | ||||||||||||||
del meta_data_dict["future_queue"] | ||||||||||||||
meta_data_dict["max_workers"] = len(self._process) | ||||||||||||||
return meta_data_dict | ||||||||||||||
elif self._process is not None: | ||||||||||||||
meta_data_dict = self._process._kwargs.copy() | ||||||||||||||
if "future_queue" in meta_data_dict.keys(): | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Simplify dictionary key check Similarly, update this condition for better readability and efficiency. Apply this diff: - if "future_queue" in meta_data_dict.keys():
+ if "future_queue" in meta_data_dict: 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff
|
||||||||||||||
del meta_data_dict["future_queue"] | ||||||||||||||
return meta_data_dict | ||||||||||||||
else: | ||||||||||||||
return None | ||||||||||||||
|
||||||||||||||
@property | ||||||||||||||
def future_queue(self) -> queue.Queue: | ||||||||||||||
""" | ||||||||||||||
Get the future queue. | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
queue.Queue: The future queue. | ||||||||||||||
""" | ||||||||||||||
return self._future_queue | ||||||||||||||
|
||||||||||||||
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future: | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid using mutable default arguments Using a mutable default argument like Apply this diff: - def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
+ def submit(self, fn: callable, *args, resource_dict: Optional[dict] = None, **kwargs) -> Future: Then, initialize if resource_dict is None:
resource_dict = {} 🧰 Tools🪛 Ruff
|
||||||||||||||
""" | ||||||||||||||
Submits a callable to be executed with the given arguments. | ||||||||||||||
|
||||||||||||||
Schedules the callable to be executed as fn(*args, **kwargs) and returns | ||||||||||||||
a Future instance representing the execution of the callable. | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
fn (callable): function to submit for execution | ||||||||||||||
args: arguments for the submitted function | ||||||||||||||
kwargs: keyword arguments for the submitted function | ||||||||||||||
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the | ||||||||||||||
function. Example resource dictionary: { | ||||||||||||||
cores: 1, | ||||||||||||||
threads_per_core: 1, | ||||||||||||||
gpus_per_worker: 0, | ||||||||||||||
oversubscribe: False, | ||||||||||||||
cwd: None, | ||||||||||||||
executor: None, | ||||||||||||||
hostname_localhost: False, | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
Future: A Future representing the given call. | ||||||||||||||
""" | ||||||||||||||
check_resource_dict_is_empty(resource_dict=resource_dict) | ||||||||||||||
check_resource_dict(function=fn) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct the argument passed to It seems that Apply this diff: - check_resource_dict(function=fn)
+ check_resource_dict(resource_dict=resource_dict)
|
||||||||||||||
f = Future() | ||||||||||||||
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) | ||||||||||||||
return f | ||||||||||||||
|
||||||||||||||
def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): | ||||||||||||||
""" | ||||||||||||||
Clean-up the resources associated with the Executor. | ||||||||||||||
|
||||||||||||||
It is safe to call this method several times. Otherwise, no other | ||||||||||||||
methods can be called after this one. | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
wait (bool): If True then shutdown will not return until all running | ||||||||||||||
futures have finished executing and the resources used by the | ||||||||||||||
parallel_executors have been reclaimed. | ||||||||||||||
cancel_futures (bool): If True then shutdown will cancel all pending | ||||||||||||||
futures. Futures that are completed or running will not be | ||||||||||||||
cancelled. | ||||||||||||||
""" | ||||||||||||||
if cancel_futures: | ||||||||||||||
cancel_items_in_queue(que=self._future_queue) | ||||||||||||||
self._future_queue.put({"shutdown": True, "wait": wait}) | ||||||||||||||
if wait and self._process is not None: | ||||||||||||||
self._process.join() | ||||||||||||||
self._future_queue.join() | ||||||||||||||
self._process = None | ||||||||||||||
self._future_queue = None | ||||||||||||||
|
||||||||||||||
def _set_process(self, process: RaisingThread): | ||||||||||||||
""" | ||||||||||||||
Set the process for the executor. | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
process (RaisingThread): The process for the executor. | ||||||||||||||
""" | ||||||||||||||
self._process = process | ||||||||||||||
self._process.start() | ||||||||||||||
|
||||||||||||||
def __len__(self) -> int: | ||||||||||||||
""" | ||||||||||||||
Get the length of the executor. | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
int: The length of the executor. | ||||||||||||||
""" | ||||||||||||||
return self._future_queue.qsize() | ||||||||||||||
|
||||||||||||||
def __del__(self): | ||||||||||||||
""" | ||||||||||||||
Clean-up the resources associated with the Executor. | ||||||||||||||
""" | ||||||||||||||
try: | ||||||||||||||
self.shutdown(wait=False) | ||||||||||||||
except (AttributeError, RuntimeError): | ||||||||||||||
pass | ||||||||||||||
Comment on lines
+141
to
+148
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid using Relying on the Would you like assistance in refactoring the cleanup process to use an explicit method instead of 🧰 Tools🪛 Ruff
Comment on lines
+145
to
+148
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Use Instead of using a Apply this diff: - try:
- self.shutdown(wait=False)
- except (AttributeError, RuntimeError):
- pass
+ with contextlib.suppress(AttributeError, RuntimeError):
+ self.shutdown(wait=False) 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,155 +0,0 @@ | ||
from typing import Optional | ||
|
||
from executorlib.interactive.executor import ( | ||
InteractiveExecutor, | ||
InteractiveStepExecutor, | ||
) | ||
from executorlib.shared.inputcheck import ( | ||
check_command_line_argument_lst, | ||
check_executor, | ||
check_gpus_per_worker, | ||
check_init_function, | ||
check_nested_flux_executor, | ||
check_oversubscribe, | ||
check_pmi, | ||
check_threads_per_core, | ||
validate_number_of_cores, | ||
) | ||
from executorlib.shared.spawner import ( | ||
SLURM_COMMAND, | ||
MpiExecSpawner, | ||
SrunSpawner, | ||
) | ||
|
||
try: # The PyFluxExecutor requires flux-core to be installed. | ||
from executorlib.interactive.flux import FluxPythonSpawner | ||
except ImportError: | ||
pass | ||
|
||
|
||
def create_executor( | ||
max_workers: int = 1, | ||
backend: str = "local", | ||
max_cores: int = 1, | ||
cache_directory: Optional[str] = None, | ||
cores_per_worker: int = 1, | ||
threads_per_core: int = 1, | ||
gpus_per_worker: int = 0, | ||
cwd: Optional[str] = None, | ||
openmpi_oversubscribe: bool = False, | ||
slurm_cmd_args: list[str] = [], | ||
flux_executor=None, | ||
flux_executor_pmi_mode: Optional[str] = None, | ||
flux_executor_nesting: bool = False, | ||
hostname_localhost: Optional[bool] = None, | ||
block_allocation: bool = False, | ||
init_function: Optional[callable] = None, | ||
): | ||
""" | ||
Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, | ||
executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The | ||
executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used | ||
for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be | ||
installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor | ||
requires the SLURM workload manager to be installed on the system. | ||
|
||
Args: | ||
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of | ||
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is | ||
recommended, as computers have a limited number of compute cores. | ||
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". | ||
max_cores (int): defines the number cores which can be used in parallel | ||
cache_directory (str, optional): The directory to store cache files. Defaults to "cache". | ||
cores_per_worker (int): number of MPI cores to be used for each function call | ||
threads_per_core (int): number of OpenMP threads to be used for each function call | ||
gpus_per_worker (int): number of GPUs per worker - defaults to 0 | ||
cwd (str/None): current working directory where the parallel python task is executed | ||
openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False | ||
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) | ||
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux | ||
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) | ||
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. | ||
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the | ||
context of an HPC cluster this essential to be able to communicate to an Executor | ||
running on a different compute node within the same allocation. And in principle | ||
any computer should be able to resolve that their own hostname points to the same | ||
address as localhost. Still MacOS >= 12 seems to disable this look up for security | ||
reasons. So on MacOS it is required to set this option to true | ||
block_allocation (boolean): To accelerate the submission of a series of python functions with the same | ||
resource requirements, executorlib supports block allocation. In this case all | ||
resources have to be defined on the executor, rather than during the submission | ||
of the individual function. | ||
init_function (None): optional function to preset arguments for functions which are submitted later | ||
""" | ||
max_cores = validate_number_of_cores(max_cores=max_cores, max_workers=max_workers) | ||
check_init_function(block_allocation=block_allocation, init_function=init_function) | ||
if flux_executor is not None and backend != "flux": | ||
backend = "flux" | ||
check_pmi(backend=backend, pmi=flux_executor_pmi_mode) | ||
executor_kwargs = { | ||
"cores": cores_per_worker, | ||
"hostname_localhost": hostname_localhost, | ||
"cwd": cwd, | ||
"cache_directory": cache_directory, | ||
} | ||
if backend == "flux": | ||
check_oversubscribe(oversubscribe=openmpi_oversubscribe) | ||
check_command_line_argument_lst(command_line_argument_lst=slurm_cmd_args) | ||
executor_kwargs["threads_per_core"] = threads_per_core | ||
executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker) | ||
executor_kwargs["flux_executor"] = flux_executor | ||
executor_kwargs["flux_executor_pmi_mode"] = flux_executor_pmi_mode | ||
executor_kwargs["flux_executor_nesting"] = flux_executor_nesting | ||
if block_allocation: | ||
executor_kwargs["init_function"] = init_function | ||
return InteractiveExecutor( | ||
max_workers=int(max_cores / cores_per_worker), | ||
executor_kwargs=executor_kwargs, | ||
spawner=FluxPythonSpawner, | ||
) | ||
else: | ||
return InteractiveStepExecutor( | ||
max_cores=max_cores, | ||
executor_kwargs=executor_kwargs, | ||
spawner=FluxPythonSpawner, | ||
) | ||
elif backend == "slurm": | ||
check_executor(executor=flux_executor) | ||
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) | ||
executor_kwargs["threads_per_core"] = threads_per_core | ||
executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker) | ||
executor_kwargs["slurm_cmd_args"] = slurm_cmd_args | ||
executor_kwargs["openmpi_oversubscribe"] = openmpi_oversubscribe | ||
if block_allocation: | ||
executor_kwargs["init_function"] = init_function | ||
return InteractiveExecutor( | ||
max_workers=int(max_cores / cores_per_worker), | ||
executor_kwargs=executor_kwargs, | ||
spawner=SrunSpawner, | ||
) | ||
else: | ||
return InteractiveStepExecutor( | ||
max_cores=max_cores, | ||
executor_kwargs=executor_kwargs, | ||
spawner=SrunSpawner, | ||
) | ||
else: # backend="local" | ||
check_threads_per_core(threads_per_core=threads_per_core) | ||
check_gpus_per_worker(gpus_per_worker=gpus_per_worker) | ||
check_command_line_argument_lst(command_line_argument_lst=slurm_cmd_args) | ||
check_executor(executor=flux_executor) | ||
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) | ||
executor_kwargs["openmpi_oversubscribe"] = openmpi_oversubscribe | ||
if block_allocation: | ||
executor_kwargs["init_function"] = init_function | ||
return InteractiveExecutor( | ||
max_workers=int(max_cores / cores_per_worker), | ||
executor_kwargs=executor_kwargs, | ||
spawner=MpiExecSpawner, | ||
) | ||
else: | ||
return InteractiveStepExecutor( | ||
max_cores=max_cores, | ||
executor_kwargs=executor_kwargs, | ||
spawner=MpiExecSpawner, | ||
) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Simplify dictionary key check
Instead of using
"key in dict.keys()"
, it's more Pythonic to use"key in dict"
.Apply this diff:
📝 Committable suggestion
🧰 Tools
🪛 Ruff