Skip to content

Commit

Permalink
Merge pull request #247 from pyiron/no_more_wait
Browse files Browse the repository at this point in the history
Use queue.Queue().get() rather than queue.Queue().get_nowait()
  • Loading branch information
jan-janssen authored Jan 31, 2024
2 parents c841760 + 38f20ab commit aa1e87e
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 30 deletions.
7 changes: 0 additions & 7 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class Executor:
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -77,7 +76,6 @@ def __init__(
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
Expand All @@ -93,7 +91,6 @@ def __new__(
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
Expand All @@ -113,7 +110,6 @@ def __new__(
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand All @@ -136,7 +132,6 @@ def __new__(
gpus_per_worker=gpus_per_worker,
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
elif slurm_installed:
Expand All @@ -145,7 +140,6 @@ def __new__(
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
else:
Expand All @@ -168,6 +162,5 @@ def __new__(
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
3 changes: 0 additions & 3 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class PyFluxExecutor(ExecutorBase):
gpus_per_worker (int): number of GPUs per worker - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -65,7 +64,6 @@ def __init__(
gpus_per_worker=0,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
Expand All @@ -76,7 +74,6 @@ def __init__(
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"hostname_localhost": hostname_localhost,
"executor_class": PyFluxSingleTaskExecutor,
# Executor Arguments
Expand Down
3 changes: 0 additions & 3 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class PyMPIExecutor(ExecutorBase):
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -59,7 +58,6 @@ def __init__(
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
hostname_localhost=False,
):
super().__init__()
Expand All @@ -69,7 +67,6 @@ def __init__(
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": PyMPISingleTaskExecutor,
"hostname_localhost": hostname_localhost,
# Executor Arguments
Expand Down
19 changes: 7 additions & 12 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import queue
import sys
from time import sleep

import cloudpickle

Expand Down Expand Up @@ -176,7 +175,6 @@ def executor_broker(
future_queue,
max_workers,
executor_class,
sleep_interval=0.1,
**kwargs,
):
meta_future_lst = _get_executor_dict(
Expand All @@ -185,17 +183,14 @@ def executor_broker(
**kwargs,
)
while True:
try:
task_dict = future_queue.get_nowait()
except queue.Empty:
sleep(sleep_interval)
if execute_task_dict(
task_dict=future_queue.get(), meta_future_lst=meta_future_lst
):
future_queue.task_done()
else:
if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
future_queue.task_done()
else:
future_queue.task_done()
future_queue.join()
break
future_queue.task_done()
future_queue.join()
break


def execute_task_dict(task_dict, meta_future_lst):
Expand Down
3 changes: 0 additions & 3 deletions pympipool/shell/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class SubprocessExecutor(ExecutorBase):
Args:
max_workers (int): defines the number workers which can execute functions in parallel
sleep_interval (float): synchronization interval - default 0.1
Examples:
Expand All @@ -82,7 +81,6 @@ class SubprocessExecutor(ExecutorBase):
def __init__(
self,
max_workers=1,
sleep_interval=0.1,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -91,7 +89,6 @@ def __init__(
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": SubprocessSingleExecutor,
},
)
Expand Down
2 changes: 0 additions & 2 deletions pympipool/slurm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def __init__(
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
hostname_localhost=False,
):
super().__init__()
Expand All @@ -72,7 +71,6 @@ def __init__(
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"hostname_localhost": hostname_localhost,
"executor_class": PySlurmSingleTaskExecutor,
# Executor Arguments
Expand Down

0 comments on commit aa1e87e

Please sign in to comment.