Skip to content

Commit 78062fd

Browse files
Update validate_number_of_cores() (#488)
* Update validate_number_of_cores() * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix test --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 90167f7 commit 78062fd

File tree

5 files changed

+82
-23
lines changed

5 files changed

+82
-23
lines changed

executorlib/interactive/executor.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ def create_executor(
197197
of the individual function.
198198
init_function (None): optional function to preset arguments for functions which are submitted later
199199
"""
200-
max_cores = validate_number_of_cores(max_cores=max_cores, max_workers=max_workers)
201200
check_init_function(block_allocation=block_allocation, init_function=init_function)
202201
if flux_executor is not None and backend != "flux":
203202
backend = "flux"
@@ -218,13 +217,19 @@ def create_executor(
218217
if block_allocation:
219218
resource_dict["init_function"] = init_function
220219
return InteractiveExecutor(
221-
max_workers=int(max_cores / cores_per_worker),
220+
max_workers=validate_number_of_cores(
221+
max_cores=max_cores,
222+
max_workers=max_workers,
223+
cores_per_worker=cores_per_worker,
224+
set_local_cores=False,
225+
),
222226
executor_kwargs=resource_dict,
223227
spawner=FluxPythonSpawner,
224228
)
225229
else:
226230
return InteractiveStepExecutor(
227231
max_cores=max_cores,
232+
max_workers=max_workers,
228233
executor_kwargs=resource_dict,
229234
spawner=FluxPythonSpawner,
230235
)
@@ -234,13 +239,19 @@ def create_executor(
234239
if block_allocation:
235240
resource_dict["init_function"] = init_function
236241
return InteractiveExecutor(
237-
max_workers=int(max_cores / cores_per_worker),
242+
max_workers=validate_number_of_cores(
243+
max_cores=max_cores,
244+
max_workers=max_workers,
245+
cores_per_worker=cores_per_worker,
246+
set_local_cores=False,
247+
),
238248
executor_kwargs=resource_dict,
239249
spawner=SrunSpawner,
240250
)
241251
else:
242252
return InteractiveStepExecutor(
243253
max_cores=max_cores,
254+
max_workers=max_workers,
244255
executor_kwargs=resource_dict,
245256
spawner=SrunSpawner,
246257
)
@@ -258,13 +269,19 @@ def create_executor(
258269
if block_allocation:
259270
resource_dict["init_function"] = init_function
260271
return InteractiveExecutor(
261-
max_workers=int(max_cores / cores_per_worker),
272+
max_workers=validate_number_of_cores(
273+
max_cores=max_cores,
274+
max_workers=max_workers,
275+
cores_per_worker=cores_per_worker,
276+
set_local_cores=True,
277+
),
262278
executor_kwargs=resource_dict,
263279
spawner=MpiExecSpawner,
264280
)
265281
else:
266282
return InteractiveStepExecutor(
267283
max_cores=max_cores,
284+
max_workers=max_workers,
268285
executor_kwargs=resource_dict,
269286
spawner=MpiExecSpawner,
270287
)

executorlib/interactive/shared.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,16 @@ class InteractiveStepExecutor(ExecutorBase):
179179

180180
def __init__(
181181
self,
182-
max_cores: int = 1,
182+
max_cores: Optional[int] = None,
183+
max_workers: Optional[int] = None,
183184
executor_kwargs: dict = {},
184185
spawner: BaseSpawner = MpiExecSpawner,
185186
):
186187
super().__init__(max_cores=executor_kwargs.get("max_cores", None))
187188
executor_kwargs["future_queue"] = self._future_queue
188189
executor_kwargs["spawner"] = spawner
189190
executor_kwargs["max_cores"] = max_cores
191+
executor_kwargs["max_workers"] = max_workers
190192
self._set_process(
191193
RaisingThread(
192194
target=execute_separate_tasks,
@@ -256,7 +258,8 @@ def execute_parallel_tasks(
256258
def execute_separate_tasks(
257259
future_queue: queue.Queue,
258260
spawner: BaseSpawner = MpiExecSpawner,
259-
max_cores: int = 1,
261+
max_cores: Optional[int] = None,
262+
max_workers: Optional[int] = None,
260263
hostname_localhost: Optional[bool] = None,
261264
**kwargs,
262265
):
@@ -267,6 +270,9 @@ def execute_separate_tasks(
267270
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
268271
spawner (BaseSpawner): Interface to start process on selected compute resources
269272
max_cores (int): defines the number cores which can be used in parallel
273+
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
274+
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
275+
recommended, as computers have a limited number of compute cores.
270276
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
271277
context of an HPC cluster this essential to be able to communicate to an
272278
Executor running on a different compute node within the same allocation. And
@@ -296,6 +302,7 @@ def execute_separate_tasks(
296302
spawner=spawner,
297303
executor_kwargs=kwargs,
298304
max_cores=max_cores,
305+
max_workers=max_workers,
299306
hostname_localhost=hostname_localhost,
300307
)
301308
qtask_lst.append(qtask)
@@ -389,7 +396,10 @@ def _get_backend_path(
389396

390397

391398
def _wait_for_free_slots(
392-
active_task_dict: dict, cores_requested: int, max_cores: int
399+
active_task_dict: dict,
400+
cores_requested: int,
401+
max_cores: Optional[int] = None,
402+
max_workers: Optional[int] = None,
393403
) -> dict:
394404
"""
395405
Wait for available computing resources to become available.
@@ -398,12 +408,23 @@ def _wait_for_free_slots(
398408
active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
399409
cores_requested (int): Number of cores required for executing the next task
400410
max_cores (int): Maximum number cores which can be used
411+
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
412+
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
413+
recommended, as computers have a limited number of compute cores.
401414
402415
Returns:
403416
dict: Dictionary containing the future objects and the number of cores they require
404417
"""
405-
while sum(active_task_dict.values()) + cores_requested > max_cores:
406-
active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()}
418+
if max_cores is not None:
419+
while sum(active_task_dict.values()) + cores_requested > max_cores:
420+
active_task_dict = {
421+
k: v for k, v in active_task_dict.items() if not k.done()
422+
}
423+
elif max_workers is not None and max_cores is None:
424+
while len(active_task_dict.values()) + 1 > max_workers:
425+
active_task_dict = {
426+
k: v for k, v in active_task_dict.items() if not k.done()
427+
}
407428
return active_task_dict
408429

409430

@@ -490,7 +511,8 @@ def _submit_function_to_separate_process(
490511
qtask: queue.Queue,
491512
spawner: BaseSpawner,
492513
executor_kwargs: dict,
493-
max_cores: int = 1,
514+
max_cores: Optional[int] = None,
515+
max_workers: Optional[int] = None,
494516
hostname_localhost: Optional[bool] = None,
495517
):
496518
"""
@@ -503,6 +525,9 @@ def _submit_function_to_separate_process(
503525
spawner (BaseSpawner): Interface to start process on selected compute resources
504526
executor_kwargs (dict): keyword parameters used to initialize the Executor
505527
max_cores (int): defines the number cores which can be used in parallel
528+
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
529+
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
530+
recommended, as computers have a limited number of compute cores.
506531
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
507532
context of an HPC cluster this essential to be able to communicate to an
508533
Executor running on a different compute node within the same allocation. And
@@ -525,6 +550,7 @@ def _submit_function_to_separate_process(
525550
active_task_dict=active_task_dict,
526551
cores_requested=resource_dict["cores"],
527552
max_cores=max_cores,
553+
max_workers=max_workers,
528554
)
529555
active_task_dict[task_dict["future"]] = resource_dict["cores"]
530556
task_kwargs = executor_kwargs.copy()

executorlib/standalone/inputcheck.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,21 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None:
170170

171171

172172
def validate_number_of_cores(
173-
max_cores: Optional[int], max_workers: Optional[int]
173+
max_cores: Optional[int] = None,
174+
max_workers: Optional[int] = None,
175+
cores_per_worker: Optional[int] = None,
176+
set_local_cores: bool = False,
174177
) -> int:
175178
"""
176179
Validate the number of cores and return the appropriate value.
177180
"""
178-
if max_workers is None and max_cores is None:
179-
return multiprocessing.cpu_count()
180-
elif max_workers is not None and max_cores is None:
181-
return max_workers
182-
else:
183-
return max_cores
181+
if max_cores is None and max_workers is None:
182+
if not set_local_cores:
183+
raise ValueError(
184+
"Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined."
185+
)
186+
else:
187+
max_workers = multiprocessing.cpu_count()
188+
elif max_cores is not None and max_workers is None:
189+
max_workers = int(max_cores / cores_per_worker)
190+
return max_workers

tests/test_executor_backend_mpi.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def tearDown(self):
9797
)
9898
def test_meta_executor_parallel_cache(self):
9999
with Executor(
100-
max_workers=2,
100+
max_cores=2,
101101
resource_dict={"cores": 2},
102102
backend="local",
103103
block_allocation=True,

tests/test_shared_input_check.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,21 @@ def test_check_pysqa_config_directory(self):
9898
check_pysqa_config_directory(pysqa_config_directory="path/to/config")
9999

100100
def test_validate_number_of_cores(self):
101+
with self.assertRaises(ValueError):
102+
validate_number_of_cores(
103+
max_cores=None, max_workers=None, cores_per_worker=None
104+
)
105+
with self.assertRaises(TypeError):
106+
validate_number_of_cores(
107+
max_cores=1, max_workers=None, cores_per_worker=None
108+
)
101109
self.assertIsInstance(
102-
validate_number_of_cores(max_cores=None, max_workers=None), int
103-
)
104-
self.assertIsInstance(
105-
validate_number_of_cores(max_cores=1, max_workers=None), int
110+
validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=1),
111+
int,
106112
)
107113
self.assertIsInstance(
108-
validate_number_of_cores(max_cores=None, max_workers=1), int
114+
validate_number_of_cores(
115+
max_cores=None, max_workers=1, cores_per_worker=None
116+
),
117+
int,
109118
)

0 commit comments

Comments
 (0)