Skip to content

Refactor classes - highlight standalone modules #444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from typing import Optional

from executorlib._version import get_versions as _get_versions
from executorlib.interactive import create_executor
from executorlib.interactive.dependencies import ExecutorWithDependencies
from executorlib.shared.inputcheck import (
from executorlib.interactive.executor import ExecutorWithDependencies, create_executor
from executorlib.standalone.inputcheck import (
check_plot_dependency_graph as _check_plot_dependency_graph,
)
from executorlib.shared.inputcheck import (
from executorlib.standalone.inputcheck import (
check_refresh_rate as _check_refresh_rate,
)

Expand Down
4 changes: 2 additions & 2 deletions executorlib/backend/interactive_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import cloudpickle

from executorlib.interactive.backend import call_funct, parse_arguments
from executorlib.shared.communication import (
from executorlib.standalone.interactive.backend import call_funct, parse_arguments
from executorlib.standalone.interactive.communication import (
interface_connect,
interface_receive,
interface_send,
Expand Down
4 changes: 2 additions & 2 deletions executorlib/backend/interactive_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from os.path import abspath
from typing import List, Optional

from executorlib.interactive.backend import call_funct, parse_arguments
from executorlib.shared.communication import (
from executorlib.standalone.interactive.backend import call_funct, parse_arguments
from executorlib.standalone.interactive.communication import (
interface_connect,
interface_receive,
interface_send,
Expand Down
Empty file added executorlib/base/__init__.py
Empty file.
148 changes: 148 additions & 0 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import queue
from concurrent.futures import (
Executor as FutureExecutor,
)
from concurrent.futures import (
Future,
)
from typing import Optional

from executorlib.standalone.inputcheck import (
check_resource_dict,
check_resource_dict_is_empty,
)
from executorlib.standalone.queue import cancel_items_in_queue
from executorlib.standalone.serialize import cloudpickle_register
from executorlib.standalone.thread import RaisingThread


class ExecutorBase(FutureExecutor):
"""
Base class for the executor.

Args:
FutureExecutor: Base class for the executor.
"""

def __init__(self):
"""
Initialize the ExecutorBase class.
"""
cloudpickle_register(ind=3)
self._future_queue: queue.Queue = queue.Queue()
self._process: Optional[RaisingThread] = None

@property
def info(self) -> Optional[dict]:
"""
Get the information about the executor.

Returns:
Optional[dict]: Information about the executor.
"""
if self._process is not None and isinstance(self._process, list):
meta_data_dict = self._process[0]._kwargs.copy()
if "future_queue" in meta_data_dict.keys():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify dictionary key check

Instead of using "key in dict.keys()", it's more Pythonic to use "key in dict".

Apply this diff:

- if "future_queue" in meta_data_dict.keys():
+ if "future_queue" in meta_data_dict:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if "future_queue" in meta_data_dict.keys():
if "future_queue" in meta_data_dict:
🧰 Tools
🪛 Ruff

45-45: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)

del meta_data_dict["future_queue"]
meta_data_dict["max_workers"] = len(self._process)
return meta_data_dict
elif self._process is not None:
meta_data_dict = self._process._kwargs.copy()
if "future_queue" in meta_data_dict.keys():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify dictionary key check

Similarly, update this condition for better readability and efficiency.

Apply this diff:

- if "future_queue" in meta_data_dict.keys():
+ if "future_queue" in meta_data_dict:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if "future_queue" in meta_data_dict.keys():
if "future_queue" in meta_data_dict:
🧰 Tools
🪛 Ruff

51-51: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)

del meta_data_dict["future_queue"]
return meta_data_dict
else:
return None

@property
def future_queue(self) -> queue.Queue:
"""
Get the future queue.

Returns:
queue.Queue: The future queue.
"""
return self._future_queue

def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using mutable default arguments

Using a mutable default argument like {} for resource_dict can lead to unexpected behavior because the same dictionary is shared across all calls to the function. It's better to use None and initialize the dictionary inside the function.

Apply this diff:

- def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
+ def submit(self, fn: callable, *args, resource_dict: Optional[dict] = None, **kwargs) -> Future:

Then, initialize resource_dict within the function:

if resource_dict is None:
    resource_dict = {}
🧰 Tools
🪛 Ruff

67-67: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

"""
Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Args:
fn (callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}

Returns:
Future: A Future representing the given call.
"""
check_resource_dict_is_empty(resource_dict=resource_dict)
check_resource_dict(function=fn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct the argument passed to check_resource_dict

It seems that check_resource_dict is being called with function=fn, but it likely expects a resource_dict argument. Ensure that you're passing the correct parameter to the function.

Apply this diff:

- check_resource_dict(function=fn)
+ check_resource_dict(resource_dict=resource_dict)

Committable suggestion was skipped due to low confidence.

f = Future()
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
return f

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
"""
Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other
methods can be called after this one.

Args:
wait (bool): If True then shutdown will not return until all running
futures have finished executing and the resources used by the
parallel_executors have been reclaimed.
cancel_futures (bool): If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
self._future_queue.put({"shutdown": True, "wait": wait})
if wait and self._process is not None:
self._process.join()
self._future_queue.join()
self._process = None
self._future_queue = None

def _set_process(self, process: RaisingThread):
"""
Set the process for the executor.

Args:
process (RaisingThread): The process for the executor.
"""
self._process = process
self._process.start()

def __len__(self) -> int:
"""
Get the length of the executor.

Returns:
int: The length of the executor.
"""
return self._future_queue.qsize()

def __del__(self):
"""
Clean-up the resources associated with the Executor.
"""
try:
self.shutdown(wait=False)
except (AttributeError, RuntimeError):
pass
Comment on lines +141 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using __del__ for resource cleanup

Relying on the __del__ method for cleanup is not recommended because the timing of object destruction is not guaranteed, which can lead to unexpected behavior. Consider providing an explicit close() or shutdown() method that users can call to clean up resources.

Would you like assistance in refactoring the cleanup process to use an explicit method instead of __del__?

🧰 Tools
🪛 Ruff

145-148: Use contextlib.suppress(AttributeError, RuntimeError) instead of try-except-pass

Replace with contextlib.suppress(AttributeError, RuntimeError)

(SIM105)

Comment on lines +145 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use contextlib.suppress for exception handling

Instead of using a try-except block that catches exceptions and does nothing, you can use contextlib.suppress to handle the exceptions more elegantly.

Apply this diff:

- try:
-     self.shutdown(wait=False)
- except (AttributeError, RuntimeError):
-     pass
+ with contextlib.suppress(AttributeError, RuntimeError):
+     self.shutdown(wait=False)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
self.shutdown(wait=False)
except (AttributeError, RuntimeError):
pass
with contextlib.suppress(AttributeError, RuntimeError):
self.shutdown(wait=False)
🧰 Tools
🪛 Ruff

145-148: Use contextlib.suppress(AttributeError, RuntimeError) instead of try-except-pass

Replace with contextlib.suppress(AttributeError, RuntimeError)

(SIM105)

4 changes: 2 additions & 2 deletions executorlib/cache/backend.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
from typing import Any

from executorlib.shared.cache import FutureItem
from executorlib.shared.hdf import dump, load
from executorlib.cache.shared import FutureItem
from executorlib.standalone.hdf import dump, load


def backend_load_file(file_name: str) -> dict:
Expand Down
6 changes: 3 additions & 3 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os

from executorlib.shared.cache import execute_in_subprocess, execute_tasks_h5
from executorlib.shared.executor import ExecutorBase
from executorlib.shared.thread import RaisingThread
from executorlib.base.executor import ExecutorBase
from executorlib.cache.shared import execute_in_subprocess, execute_tasks_h5
from executorlib.standalone.thread import RaisingThread


class FileExecutor(ExecutorBase):
Expand Down
6 changes: 3 additions & 3 deletions executorlib/shared/cache.py → executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from concurrent.futures import Future
from typing import Tuple

from executorlib.shared.command import get_command_path
from executorlib.shared.hdf import dump, get_output
from executorlib.shared.serialize import serialize_funct_h5
from executorlib.standalone.command import get_command_path
from executorlib.standalone.hdf import dump, get_output
from executorlib.standalone.serialize import serialize_funct_h5


class FutureItem:
Expand Down
155 changes: 0 additions & 155 deletions executorlib/interactive/__init__.py
Original file line number Diff line number Diff line change
@@ -1,155 +0,0 @@
from typing import Optional

from executorlib.interactive.executor import (
InteractiveExecutor,
InteractiveStepExecutor,
)
from executorlib.shared.inputcheck import (
check_command_line_argument_lst,
check_executor,
check_gpus_per_worker,
check_init_function,
check_nested_flux_executor,
check_oversubscribe,
check_pmi,
check_threads_per_core,
validate_number_of_cores,
)
from executorlib.shared.spawner import (
SLURM_COMMAND,
MpiExecSpawner,
SrunSpawner,
)

try: # The PyFluxExecutor requires flux-core to be installed.
from executorlib.interactive.flux import FluxPythonSpawner
except ImportError:
pass


def create_executor(
max_workers: int = 1,
backend: str = "local",
max_cores: int = 1,
cache_directory: Optional[str] = None,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
cwd: Optional[str] = None,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: list[str] = [],
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
):
"""
Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be
installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
requires the SLURM workload manager to be installed on the system.

Args:
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
max_cores (int): defines the number cores which can be used in parallel
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an Executor
running on a different compute node within the same allocation. And in principle
any computer should be able to resolve that their own hostname points to the same
address as localhost. Still MacOS >= 12 seems to disable this look up for security
reasons. So on MacOS it is required to set this option to true
block_allocation (boolean): To accelerate the submission of a series of python functions with the same
resource requirements, executorlib supports block allocation. In this case all
resources have to be defined on the executor, rather than during the submission
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
"""
max_cores = validate_number_of_cores(max_cores=max_cores, max_workers=max_workers)
check_init_function(block_allocation=block_allocation, init_function=init_function)
if flux_executor is not None and backend != "flux":
backend = "flux"
check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
executor_kwargs = {
"cores": cores_per_worker,
"hostname_localhost": hostname_localhost,
"cwd": cwd,
"cache_directory": cache_directory,
}
if backend == "flux":
check_oversubscribe(oversubscribe=openmpi_oversubscribe)
check_command_line_argument_lst(command_line_argument_lst=slurm_cmd_args)
executor_kwargs["threads_per_core"] = threads_per_core
executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
executor_kwargs["flux_executor"] = flux_executor
executor_kwargs["flux_executor_pmi_mode"] = flux_executor_pmi_mode
executor_kwargs["flux_executor_nesting"] = flux_executor_nesting
if block_allocation:
executor_kwargs["init_function"] = init_function
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
spawner=FluxPythonSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
spawner=FluxPythonSpawner,
)
elif backend == "slurm":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
executor_kwargs["threads_per_core"] = threads_per_core
executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
executor_kwargs["slurm_cmd_args"] = slurm_cmd_args
executor_kwargs["openmpi_oversubscribe"] = openmpi_oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
spawner=SrunSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
spawner=SrunSpawner,
)
else: # backend="local"
check_threads_per_core(threads_per_core=threads_per_core)
check_gpus_per_worker(gpus_per_worker=gpus_per_worker)
check_command_line_argument_lst(command_line_argument_lst=slurm_cmd_args)
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
executor_kwargs["openmpi_oversubscribe"] = openmpi_oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
spawner=MpiExecSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
spawner=MpiExecSpawner,
)
Loading
Loading