Skip to content
2 changes: 2 additions & 0 deletions src/executorlib/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from executorlib.executor.single import TestClusterExecutor
from executorlib.standalone.command import get_command_path
from executorlib.standalone.interactive.communication import (
ExecutorlibSocketError,
SocketInterface,
interface_bootup,
interface_connect,
Expand All @@ -32,4 +33,5 @@
"MpiExecSpawner",
"SocketInterface",
"SubprocessSpawner",
"ExecutorlibSocketError",
]
6 changes: 6 additions & 0 deletions src/executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class FluxJobExecutor(BaseExecutor):
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down Expand Up @@ -136,6 +137,7 @@ def __init__(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down Expand Up @@ -246,6 +248,7 @@ class FluxClusterExecutor(BaseExecutor):
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down Expand Up @@ -333,6 +336,7 @@ def __init__(
only)
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down Expand Up @@ -364,6 +368,7 @@ def __init__(
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
"run_time_limit": None,
}
if resource_dict is None:
resource_dict = {}
Expand Down Expand Up @@ -478,6 +483,7 @@ def create_flux_executor(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down
4 changes: 4 additions & 0 deletions src/executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class SingleNodeExecutor(BaseExecutor):
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -126,6 +127,7 @@ def __init__(
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -219,6 +221,7 @@ class TestClusterExecutor(BaseExecutor):
- cwd (str/None): current working directory where the parallel python task is executed
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -296,6 +299,7 @@ def __init__(
- cwd (str/None): current working directory where the parallel python task is executed
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down
5 changes: 5 additions & 0 deletions src/executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class SlurmClusterExecutor(BaseExecutor):
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down Expand Up @@ -132,6 +133,7 @@ def __init__(
only)
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down Expand Up @@ -267,6 +269,7 @@ class SlurmJobExecutor(BaseExecutor):
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -356,6 +359,7 @@ def __init__(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -469,6 +473,7 @@ def create_slurm_executor(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- run_time_limit (int): The maximum runtime in seconds for each task. Default: None
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down
4 changes: 4 additions & 0 deletions src/executorlib/standalone/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def generate_slurm_command(
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
pmi_mode: Optional[str] = None,
run_time_limit: Optional[int] = None,
) -> list[str]:
"""
Generate the command list for the SLURM interface.
Expand All @@ -140,6 +141,7 @@ def generate_slurm_command(
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
run_time_limit (int): The maximum runtime in seconds for each task. Default: None

Returns:
list[str]: The generated command list.
Expand All @@ -159,6 +161,8 @@ def generate_slurm_command(
command_prepend_lst += ["--exact"]
if openmpi_oversubscribe:
command_prepend_lst += ["--oversubscribe"]
if run_time_limit is not None:
command_prepend_lst += ["--time=" + str(run_time_limit // 60 + 1)]
Comment on lines +164 to +165
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

--time conversion uses floor+1 instead of ceiling, wasting a minute for exact multiples of 60.

run_time_limit // 60 + 1 produces floor(seconds / 60) + 1, not ceil(seconds / 60). For exact multiples (e.g., run_time_limit=3600), this allocates 61 minutes instead of 60. Non-exact multiples (e.g., 3601 s) happen to produce the correct ceiling, so the formula is inconsistent.

🐛 Proposed fix using proper ceiling arithmetic
     if run_time_limit is not None:
-        command_prepend_lst += ["--time=" + str(run_time_limit // 60 + 1)]
+        command_prepend_lst += ["--time=" + str((run_time_limit + 59) // 60)]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/executorlib/standalone/command.py` around lines 164 - 165, The current
conversion of run_time_limit to minutes uses floor+1 and over-allocates for
exact multiples; change the calculation in the block that appends to
command_prepend_lst (the line building "--time=" from run_time_limit) to use a
proper ceiling: replace the expression run_time_limit // 60 + 1 with a correct
ceiling computation (e.g., use math.ceil(run_time_limit / 60) or integer math
(run_time_limit + 59) // 60) so exact multiples map to the exact minute count.

if slurm_cmd_args is not None and len(slurm_cmd_args) > 0:
command_prepend_lst += slurm_cmd_args
return command_prepend_lst
1 change: 1 addition & 0 deletions src/executorlib/task_scheduler/file/spawner_pysqa.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def execute_with_pysqa(
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),
"run_time_max": resource_dict.get("run_time_limit"),
}
if "cwd" in resource_dict:
del resource_dict["cwd"]
Expand Down
5 changes: 5 additions & 0 deletions src/executorlib/task_scheduler/interactive/spawner_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class FluxPythonSpawner(BaseSpawner):
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
run_time_limit (int): The maximum runtime in seconds for each task. Default: None
"""

def __init__(
Expand All @@ -61,6 +62,7 @@ def __init__(
flux_executor: Optional[flux.job.FluxExecutor] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
run_time_limit: Optional[int] = None,
):
super().__init__(
cwd=cwd,
Expand All @@ -78,6 +80,7 @@ def __init__(
self._flux_log_files = flux_log_files
self._priority = priority
self._future = None
self._run_time_limit = run_time_limit

def bootup(
self,
Expand Down Expand Up @@ -128,6 +131,8 @@ def bootup(
if self._cwd is not None:
jobspec.cwd = self._cwd
os.makedirs(self._cwd, exist_ok=True)
if self._run_time_limit is not None:
jobspec.duration = self._run_time_limit
file_prefix = "flux_" + str(self._worker_id)
if self._flux_log_files and self._cwd is not None:
jobspec.stderr = os.path.join(self._cwd, file_prefix + ".err")
Expand Down
4 changes: 4 additions & 0 deletions src/executorlib/task_scheduler/interactive/spawner_pysqa.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(
pmi_mode: Optional[str] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
run_time_limit: Optional[int] = None,
**kwargs,
):
"""
Expand All @@ -50,6 +51,7 @@ def __init__(
pmi_mode (str, optional): PMI interface to use (OpenMPI v5 requires pmix) default is None
config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str): name of the backend used to spawn tasks.
run_time_limit (int): The maximum runtime in seconds for each task. Default: None
"""
super().__init__(
cwd=cwd,
Expand All @@ -68,6 +70,7 @@ def __init__(
self._pysqa_submission_kwargs = kwargs
self._process: Optional[int] = None
self._queue_adapter: Optional[QueueAdapter] = None
self._run_time_limit = run_time_limit

def bootup(
self,
Expand Down Expand Up @@ -191,6 +194,7 @@ def _start_process_helper(
command=" ".join(self.generate_command(command_lst=command_lst)),
working_directory=working_directory,
cores=int(self._cores * self._threads_per_core),
run_time_max=self._run_time_limit,
**self._pysqa_submission_kwargs,
)

Expand Down
4 changes: 4 additions & 0 deletions src/executorlib/task_scheduler/interactive/spawner_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
pmi_mode: Optional[str] = None,
run_time_limit: Optional[int] = None,
):
"""
Srun interface implementation.
Expand All @@ -47,6 +48,7 @@ def __init__(
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
run_time_limit (int): The maximum runtime in seconds for each task. Default: None
"""
super().__init__(
cwd=cwd,
Expand All @@ -60,6 +62,7 @@ def __init__(
self._num_nodes = num_nodes
self._exclusive = exclusive
self._pmi_mode = pmi_mode
self._run_time_limit = run_time_limit

def generate_command(self, command_lst: list[str]) -> list[str]:
"""
Expand All @@ -81,6 +84,7 @@ def generate_command(self, command_lst: list[str]) -> list[str]:
openmpi_oversubscribe=self._openmpi_oversubscribe,
slurm_cmd_args=self._slurm_cmd_args,
pmi_mode=self._pmi_mode,
run_time_limit=self._run_time_limit,
)
return super().generate_command(
command_lst=command_prepend_lst + command_lst,
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/executor/test_flux_job.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
import unittest
from time import sleep

import numpy as np

from executorlib import FluxJobExecutor
from executorlib.api import ExecutorlibSocketError


try:
Expand All @@ -20,6 +22,11 @@ def calc(i):
return i


def delayed_calc(i):
sleep(2)
return i


def mpi_funct(i):
from mpi4py import MPI

Expand Down Expand Up @@ -110,6 +117,24 @@ def test_single_task(self):
[[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]],
)

def test_run_time_limit(self):
with FluxJobExecutor(
max_cores=1,
resource_dict={"cores": 1},
flux_executor=self.executor,
block_allocation=False,
pmi_mode=pmi,
) as p:
f1 = p.submit(delayed_calc, 1, resource_dict={"run_time_limit": 1})
f2 = p.submit(delayed_calc, 2, resource_dict={"run_time_limit": 5})
self.assertFalse(f1.done())
self.assertFalse(f2.done())
self.assertEqual(f2.result(), 2)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
with self.assertRaises(ExecutorlibSocketError):
f1.result()
Comment on lines +120 to +136
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Timing logic is sound; one redundant assertion and a minor flakiness risk to be aware of.

The core timing invariant holds in both sequential and parallel execution:

  • Sequential (max_cores=1 gates f2 behind f1): f1 is killed at t≈1 s, f2 completes at t≈3 s → f1 has been dead for ~2 s when f2.result() returns.
  • Parallel (if block_allocation=False submits both Flux jobs independently): f1 is killed at t≈1 s, f2 completes at t≈2 s → f1 still dead before f2.result() returns.

Two minor points:

  1. Line 134 is redundantassertTrue(f2.done()) is trivially true after f2.result() already returned on line 132. Removing it would tighten the intent of the test.

  2. Potential flakiness at line 133assertTrue(f1.done()) assumes Flux's kill-signal propagation back to the Python future completes within the ~1–2 s window before f2.result() returns. This is reasonable in practice, but if Flux's async notification path experiences transient latency the assertion can fail spuriously. Consider adding a short polling loop or a bounded wait only if this turns out to be flaky in CI.

♻️ Suggested cleanup (remove redundant assertion)
             self.assertEqual(f2.result(), 2)
             self.assertTrue(f1.done())
-            self.assertTrue(f2.done())
             with self.assertRaises(ExecutorlibSocketError):
                 f1.result()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/executor/test_flux_job.py` around lines 120 - 136, In
test_run_time_limit, remove the redundant assertion AssertTrue(f2.done()) that
follows f2.result() (since f2.result() already guarantees completion) and to
avoid potential flakiness replace the direct AssertTrue(f1.done()) with a short
bounded poll/wait that checks f1.done() repeatedly (e.g., loop with small sleep
and total timeout of a few seconds) before asserting True; locate these in the
test_run_time_limit function where f1 and f2 are submitted via FluxJobExecutor
and delayed_calc is used.


def test_output_files_cwd(self):
dirname = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
os.makedirs(dirname, exist_ok=True)
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/standalone/test_slurm_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def test_generate_slurm_command(self):
exclusive=True,
openmpi_oversubscribe=True,
slurm_cmd_args=["--help"],
run_time_limit=250,
)
self.assertEqual(len(command_lst), 12)
reply_lst = ['srun', '-n', '1', '-D', '/tmp/test', '-N', '1', '--cpus-per-task=2', '--gpus-per-task=1', '--exact', '--oversubscribe', '--help']
self.assertEqual(len(command_lst), 13)
reply_lst = ['srun', '-n', '1', '-D', '/tmp/test', '-N', '1', '--cpus-per-task=2', '--gpus-per-task=1', '--exact', '--oversubscribe', '--time=5', '--help']
self.assertEqual(command_lst, reply_lst)
Comment on lines +58 to 62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

ast-grep --pattern 'def generate_slurm_command($$$)'

Repository: pyiron/executorlib

Length of output: 4450


Replace run_time_limit // 60 + 1 with ceiling division to prevent over-allocating time for jobs with exact minute boundaries.

The current implementation at line 165 of src/executorlib/standalone/command.py uses run_time_limit // 60 + 1, which always adds an extra minute. For jobs where run_time_limit is a multiple of 60 seconds (e.g., 60, 120, 180), this allocates unnecessary overhead: run_time_limit=60 becomes --time=2 instead of --time=1. Use math.ceil(run_time_limit / 60) instead to allocate exactly the minimum required time: ceiling ensures protection against under-allocation (e.g., 61 seconds → 2 minutes) without over-allocating when the limit is already a whole number of minutes.

🧰 Tools
🪛 Ruff (0.15.1)

[error] 61-61: Probable insecure usage of temporary file or directory: "/tmp/test"

(S108)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/standalone/test_slurm_command.py` around lines 58 - 62, The time
rounding logic that builds the SLURM time option currently uses run_time_limit
// 60 + 1 which over-allocates when run_time_limit is an exact multiple of 60;
replace that expression with math.ceil(run_time_limit / 60) (import math if not
already imported) in the function that constructs the SLURM command/time flag
(the code path using the run_time_limit variable and producing the "--time="
value) so that exact-minute limits map to the correct minute value and partial
minutes round up correctly.

Loading