Skip to content

Commit 3f758bd

Browse files
committed
Merge remote-tracking branch 'origin/main' into more_error_tests
2 parents dfd473d + 95c9480 commit 3f758bd

24 files changed

+243
-180
lines changed

.github/workflows/mypy.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
name: MyPy
2+
3+
on:
4+
push:
5+
branches: [ main ]
6+
pull_request:
7+
8+
jobs:
9+
mypy:
10+
runs-on: ubuntu-latest
11+
steps:
12+
- name: Setup Python
13+
uses: actions/setup-python@v5
14+
with:
15+
python-version: "3.13"
16+
architecture: x64
17+
- name: Checkout
18+
uses: actions/checkout@v4
19+
- name: Install mypy
20+
run: pip install mypy
21+
- name: Test
22+
run: mypy --ignore-missing-imports ${{ github.event.repository.name }}

executorlib/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Optional
1+
from typing import Callable, Optional
22

33
from executorlib._version import get_versions as _get_versions
44
from executorlib.interactive.executor import (
@@ -16,7 +16,7 @@
1616
)
1717

1818
__version__ = _get_versions()["version"]
19-
__all__ = []
19+
__all__: list = []
2020

2121

2222
class Executor:
@@ -100,7 +100,7 @@ def __init__(
100100
pysqa_config_directory: Optional[str] = None,
101101
hostname_localhost: Optional[bool] = None,
102102
block_allocation: bool = False,
103-
init_function: Optional[callable] = None,
103+
init_function: Optional[Callable] = None,
104104
disable_dependencies: bool = False,
105105
refresh_rate: float = 0.01,
106106
plot_dependency_graph: bool = False,
@@ -123,7 +123,7 @@ def __new__(
123123
pysqa_config_directory: Optional[str] = None,
124124
hostname_localhost: Optional[bool] = None,
125125
block_allocation: bool = False,
126-
init_function: Optional[callable] = None,
126+
init_function: Optional[Callable] = None,
127127
disable_dependencies: bool = False,
128128
refresh_rate: float = 0.01,
129129
plot_dependency_graph: bool = False,
@@ -177,7 +177,7 @@ def __new__(
177177
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
178178
179179
"""
180-
default_resource_dict = {
180+
default_resource_dict: dict = {
181181
"cores": 1,
182182
"threads_per_core": 1,
183183
"gpus_per_core": 0,

executorlib/backend/cache_parallel.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pickle
22
import sys
33
import time
4+
from typing import Any
45

56
import cloudpickle
67

@@ -24,7 +25,7 @@ def main() -> None:
2425
"""
2526
from mpi4py import MPI
2627

27-
MPI.pickle.__init__(
28+
MPI.pickle.__init__( # type: ignore
2829
cloudpickle.dumps,
2930
cloudpickle.loads,
3031
pickle.HIGHEST_PROTOCOL,
@@ -34,10 +35,9 @@ def main() -> None:
3435
file_name = sys.argv[1]
3536

3637
time_start = time.time()
38+
apply_dict = {}
3739
if mpi_rank_zero:
3840
apply_dict = backend_load_file(file_name=file_name)
39-
else:
40-
apply_dict = None
4141
apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0)
4242
output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
4343
if mpi_size_larger_one:

executorlib/backend/interactive_parallel.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import pickle
22
import sys
33
from os.path import abspath
4+
from typing import Optional
45

56
import cloudpickle
7+
import zmq
68

79
from executorlib.standalone.interactive.backend import call_funct, parse_arguments
810
from executorlib.standalone.interactive.communication import (
@@ -24,7 +26,7 @@ def main() -> None:
2426
"""
2527
from mpi4py import MPI
2628

27-
MPI.pickle.__init__(
29+
MPI.pickle.__init__( # type: ignore
2830
cloudpickle.dumps,
2931
cloudpickle.loads,
3032
pickle.HIGHEST_PROTOCOL,
@@ -33,13 +35,12 @@ def main() -> None:
3335
mpi_size_larger_one = MPI.COMM_WORLD.Get_size() > 1
3436

3537
argument_dict = parse_arguments(argument_lst=sys.argv)
38+
context: Optional[zmq.Context] = None
39+
socket: Optional[zmq.Socket] = None
3640
if mpi_rank_zero:
3741
context, socket = interface_connect(
3842
host=argument_dict["host"], port=argument_dict["zmqport"]
3943
)
40-
else:
41-
context = None
42-
socket = None
4344

4445
memory = None
4546

@@ -50,10 +51,9 @@ def main() -> None:
5051

5152
while True:
5253
# Read from socket
54+
input_dict: dict = {}
5355
if mpi_rank_zero:
5456
input_dict = interface_receive(socket=socket)
55-
else:
56-
input_dict = None
5757
input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0)
5858

5959
# Parse input

executorlib/base/executor.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from concurrent.futures import (
66
Future,
77
)
8-
from typing import Optional
8+
from typing import Callable, List, Optional, Union
99

1010
from executorlib.standalone.inputcheck import check_resource_dict
1111
from executorlib.standalone.queue import cancel_items_in_queue
@@ -27,8 +27,8 @@ def __init__(self, max_cores: Optional[int] = None):
2727
"""
2828
cloudpickle_register(ind=3)
2929
self._max_cores = max_cores
30-
self._future_queue: queue.Queue = queue.Queue()
31-
self._process: Optional[RaisingThread] = None
30+
self._future_queue: Optional[queue.Queue] = queue.Queue()
31+
self._process: Optional[Union[RaisingThread, List[RaisingThread]]] = None
3232

3333
@property
3434
def info(self) -> Optional[dict]:
@@ -39,21 +39,21 @@ def info(self) -> Optional[dict]:
3939
Optional[dict]: Information about the executor.
4040
"""
4141
if self._process is not None and isinstance(self._process, list):
42-
meta_data_dict = self._process[0]._kwargs.copy()
42+
meta_data_dict = self._process[0].get_kwargs().copy()
4343
if "future_queue" in meta_data_dict.keys():
4444
del meta_data_dict["future_queue"]
4545
meta_data_dict["max_workers"] = len(self._process)
4646
return meta_data_dict
4747
elif self._process is not None:
48-
meta_data_dict = self._process._kwargs.copy()
48+
meta_data_dict = self._process.get_kwargs().copy()
4949
if "future_queue" in meta_data_dict.keys():
5050
del meta_data_dict["future_queue"]
5151
return meta_data_dict
5252
else:
5353
return None
5454

5555
@property
56-
def future_queue(self) -> queue.Queue:
56+
def future_queue(self) -> Optional[queue.Queue]:
5757
"""
5858
Get the future queue.
5959
@@ -62,7 +62,7 @@ def future_queue(self) -> queue.Queue:
6262
"""
6363
return self._future_queue
6464

65-
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
65+
def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Future: # type: ignore
6666
"""
6767
Submits a callable to be executed with the given arguments.
6868
@@ -97,16 +97,17 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut
9797
"The specified number of cores is larger than the available number of cores."
9898
)
9999
check_resource_dict(function=fn)
100-
f = Future()
101-
self._future_queue.put(
102-
{
103-
"fn": fn,
104-
"args": args,
105-
"kwargs": kwargs,
106-
"future": f,
107-
"resource_dict": resource_dict,
108-
}
109-
)
100+
f: Future = Future()
101+
if self._future_queue is not None:
102+
self._future_queue.put(
103+
{
104+
"fn": fn,
105+
"args": args,
106+
"kwargs": kwargs,
107+
"future": f,
108+
"resource_dict": resource_dict,
109+
}
110+
)
110111
return f
111112

112113
def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
@@ -124,11 +125,11 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
124125
futures. Futures that are completed or running will not be
125126
cancelled.
126127
"""
127-
if cancel_futures:
128+
if cancel_futures and self._future_queue is not None:
128129
cancel_items_in_queue(que=self._future_queue)
129-
if self._process is not None:
130+
if self._process is not None and self._future_queue is not None:
130131
self._future_queue.put({"shutdown": True, "wait": wait})
131-
if wait:
132+
if wait and isinstance(self._process, RaisingThread):
132133
self._process.join()
133134
self._future_queue.join()
134135
self._process = None
@@ -151,7 +152,10 @@ def __len__(self) -> int:
151152
Returns:
152153
int: The length of the executor.
153154
"""
154-
return self._future_queue.qsize()
155+
queue_size = 0
156+
if self._future_queue is not None:
157+
queue_size = self._future_queue.qsize()
158+
return queue_size
155159

156160
def __del__(self):
157161
"""

executorlib/cache/executor.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import os
2-
from typing import Optional
2+
from typing import Callable, Optional
33

44
from executorlib.base.executor import ExecutorBase
55
from executorlib.cache.shared import execute_tasks_h5
@@ -21,16 +21,16 @@
2121
from executorlib.cache.queue_spawner import execute_with_pysqa
2222
except ImportError:
2323
# If pysqa is not available fall back to executing tasks in a subprocess
24-
execute_with_pysqa = execute_in_subprocess
24+
execute_with_pysqa = execute_in_subprocess # type: ignore
2525

2626

2727
class FileExecutor(ExecutorBase):
2828
def __init__(
2929
self,
3030
cache_directory: str = "cache",
3131
resource_dict: Optional[dict] = None,
32-
execute_function: callable = execute_with_pysqa,
33-
terminate_function: Optional[callable] = None,
32+
execute_function: Callable = execute_with_pysqa,
33+
terminate_function: Optional[Callable] = None,
3434
pysqa_config_directory: Optional[str] = None,
3535
backend: Optional[str] = None,
3636
disable_dependencies: bool = False,
@@ -43,8 +43,8 @@ def __init__(
4343
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
4444
- cores (int): number of MPI cores to be used for each function call
4545
- cwd (str/None): current working directory where the parallel python task is executed
46-
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
47-
terminate_function (callable, optional): The function to terminate the tasks.
46+
execute_function (Callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
47+
terminate_function (Callable, optional): The function to terminate the tasks.
4848
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
4949
backend (str, optional): name of the backend used to spawn tasks.
5050
disable_dependencies (boolean): Disable resolving future objects during the submission.
@@ -81,9 +81,9 @@ def __init__(
8181

8282

8383
def create_file_executor(
84-
max_workers: int = 1,
84+
max_workers: Optional[int] = None,
8585
backend: str = "flux_submission",
86-
max_cores: int = 1,
86+
max_cores: Optional[int] = None,
8787
cache_directory: Optional[str] = None,
8888
resource_dict: Optional[dict] = None,
8989
flux_executor=None,
@@ -93,7 +93,7 @@ def create_file_executor(
9393
pysqa_config_directory: Optional[str] = None,
9494
hostname_localhost: Optional[bool] = None,
9595
block_allocation: bool = False,
96-
init_function: Optional[callable] = None,
96+
init_function: Optional[Callable] = None,
9797
disable_dependencies: bool = False,
9898
):
9999
if cache_directory is None:

executorlib/cache/queue_spawner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def execute_with_pysqa(
1616
config_directory: Optional[str] = None,
1717
backend: Optional[str] = None,
1818
cache_directory: Optional[str] = None,
19-
) -> Tuple[int, int]:
19+
) -> Optional[int]:
2020
"""
2121
Execute a command by submitting it to the queuing system
2222

executorlib/cache/shared.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import queue
44
import sys
55
from concurrent.futures import Future
6-
from typing import Optional, Tuple
6+
from typing import Any, Callable, Optional, Tuple
77

88
from executorlib.standalone.command import get_command_path
99
from executorlib.standalone.hdf import dump, get_output
@@ -21,7 +21,7 @@ def __init__(self, file_name: str):
2121
"""
2222
self._file_name = file_name
2323

24-
def result(self) -> str:
24+
def result(self) -> Any:
2525
"""
2626
Get the result of the future item.
2727
@@ -49,9 +49,9 @@ def done(self) -> bool:
4949
def execute_tasks_h5(
5050
future_queue: queue.Queue,
5151
cache_directory: str,
52-
execute_function: callable,
52+
execute_function: Callable,
5353
resource_dict: dict,
54-
terminate_function: Optional[callable] = None,
54+
terminate_function: Optional[Callable] = None,
5555
pysqa_config_directory: Optional[str] = None,
5656
backend: Optional[str] = None,
5757
disable_dependencies: bool = False,
@@ -65,16 +65,18 @@ def execute_tasks_h5(
6565
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
6666
- cores (int): number of MPI cores to be used for each function call
6767
- cwd (str/None): current working directory where the parallel python task is executed
68-
execute_function (callable): The function to execute the tasks.
69-
terminate_function (callable): The function to terminate the tasks.
68+
execute_function (Callable): The function to execute the tasks.
69+
terminate_function (Callable): The function to terminate the tasks.
7070
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
7171
backend (str, optional): name of the backend used to spawn tasks.
7272
7373
Returns:
7474
None
7575
7676
"""
77-
memory_dict, process_dict, file_name_dict = {}, {}, {}
77+
memory_dict: dict = {}
78+
process_dict: dict = {}
79+
file_name_dict: dict = {}
7880
while True:
7981
task_dict = None
8082
try:

0 commit comments

Comments
 (0)