[Feature] Implement run time limits#930
Conversation
📝 WalkthroughWalkthroughAdds a new run_time_limit parameter (seconds) across executors, spawners, and SLURM command generation; the value is propagated into resource dictionaries and used where scheduler interfaces accept duration (Flux jobspec.duration, SLURM --time, PySqa run_time_max). Also exposes ExecutorlibSocketError in the public API. No other control-flow changes. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Executor as Executor (Flux/Slurm/Single)
participant Spawner
participant CmdGen as generate_slurm_command
participant Scheduler as Scheduler (Flux/Slurm/PySqa)
Client->>Executor: submit(task, resource_dict{..., run_time_limit})
Executor->>Spawner: create spawner(resource_dict includes run_time_limit)
Spawner->>CmdGen: build command (if Slurm) with run_time_limit -> --time
Spawner->>Scheduler: submit job (jobspec.duration / run_time_max set when provided)
Scheduler-->>Spawner: job accepted / started
Spawner-->>Executor: return job handle
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/executorlib/executor/single.py (1)
437-474:⚠️ Potential issue | 🔴 CriticalStrip
run_time_limitfromresource_dictbefore passing toMpiExecSpawner.
create_single_node_executorremoves scheduler-specific keys (threads_per_core,gpus_per_core,slurm_cmd_args) before passingresource_dictasexecutor_kwargsto the task schedulers. However,run_time_limitis not removed. Whenexecutor_kwargsis passed down to_execute_multiple_tasks, any unconsumed keys are forwarded as**kwargsto the spawner instantiation. SinceMpiExecSpawner(viaSubprocessSpawner) does not acceptrun_time_limitand has no**kwargscapture, this raises aTypeErrorat runtime if a user setsrun_time_limitinresource_dictforSingleNodeExecutor.🐛 Proposed fix
if "slurm_cmd_args" in resource_dict: del resource_dict["slurm_cmd_args"] + if "run_time_limit" in resource_dict: + del resource_dict["run_time_limit"]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/executorlib/executor/single.py` around lines 437 - 474, The resource_dict passed from create_single_node_executor still contains run_time_limit which gets forwarded as executor_kwargs into the task schedulers and ultimately to MpiExecSpawner/SubprocessSpawner (via _execute_multiple_tasks), causing a TypeError; remove "run_time_limit" from resource_dict alongside the other keys (threads_per_core, gpus_per_core, slurm_cmd_args) before constructing BlockAllocationTaskScheduler or OneProcessTaskScheduler so the spawner is not instantiated with an unsupported keyword argument.
🧹 Nitpick comments (2)
src/executorlib/executor/flux.py (1)
167-174:run_time_limitmissing fromFluxJobExecutor.default_resource_dict.
FluxClusterExecutor.default_resource_dict(line 371) explicitly sets"run_time_limit": None, butFluxJobExecutor.default_resource_dictdoes not include it, despite the docstring advertising it as a valid key. All other shared keys are present in both defaults. While the behaviour is identical at runtime (the spawner defaults toNone), the inconsistency is confusing.♻️ Suggested fix
default_resource_dict: dict = { "cores": 1, "threads_per_core": 1, "gpus_per_core": 0, "cwd": None, "openmpi_oversubscribe": False, "slurm_cmd_args": [], + "run_time_limit": None, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/executorlib/executor/flux.py` around lines 167 - 174, FluxJobExecutor.default_resource_dict is missing the "run_time_limit" key which FluxClusterExecutor.default_resource_dict includes; update the dict in FluxJobExecutor.default_resource_dict to add "run_time_limit": None so the two defaults are consistent with the docstring and other shared keys (compare with FluxClusterExecutor.default_resource_dict to mirror its structure).src/executorlib/executor/slurm.py (1)
161-168:run_time_limitmissing fromSlurmClusterExecutorandSlurmJobExecutordefault resource dicts (Lines 161-168, 386-393).Both SLURM executor default dicts omit
"run_time_limit": None, inconsistent withFluxClusterExecutorwhich includes it. The docstrings advertise it as a valid key for all executors.♻️ Suggested fix (apply to both `SlurmClusterExecutor` and `SlurmJobExecutor` `default_resource_dict`)
default_resource_dict: dict = { "cores": 1, "threads_per_core": 1, "gpus_per_core": 0, "cwd": None, "openmpi_oversubscribe": False, "slurm_cmd_args": [], + "run_time_limit": None, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/executorlib/executor/slurm.py` around lines 161 - 168, The Slurm executors' default_resource_dicts are missing the "run_time_limit" key; update the default_resource_dict in both SlurmClusterExecutor and SlurmJobExecutor to include "run_time_limit": None so it matches FluxClusterExecutor and the documented API; locate the default_resource_dict definitions inside the SlurmClusterExecutor and SlurmJobExecutor classes and add the "run_time_limit" entry to each default dict.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/executorlib/standalone/command.py`:
- Around line 164-165: The current conversion of run_time_limit to minutes uses
floor+1 and over-allocates for exact multiples; change the calculation in the
block that appends to command_prepend_lst (the line building "--time=" from
run_time_limit) to use a proper ceiling: replace the expression run_time_limit
// 60 + 1 with a correct ceiling computation (e.g., use math.ceil(run_time_limit
/ 60) or integer math (run_time_limit + 59) // 60) so exact multiples map to the
exact minute count.
---
Outside diff comments:
In `@src/executorlib/executor/single.py`:
- Around line 437-474: The resource_dict passed from create_single_node_executor
still contains run_time_limit which gets forwarded as executor_kwargs into the
task schedulers and ultimately to MpiExecSpawner/SubprocessSpawner (via
_execute_multiple_tasks), causing a TypeError; remove "run_time_limit" from
resource_dict alongside the other keys (threads_per_core, gpus_per_core,
slurm_cmd_args) before constructing BlockAllocationTaskScheduler or
OneProcessTaskScheduler so the spawner is not instantiated with an unsupported
keyword argument.
---
Nitpick comments:
In `@src/executorlib/executor/flux.py`:
- Around line 167-174: FluxJobExecutor.default_resource_dict is missing the
"run_time_limit" key which FluxClusterExecutor.default_resource_dict includes;
update the dict in FluxJobExecutor.default_resource_dict to add
"run_time_limit": None so the two defaults are consistent with the docstring and
other shared keys (compare with FluxClusterExecutor.default_resource_dict to
mirror its structure).
In `@src/executorlib/executor/slurm.py`:
- Around line 161-168: The Slurm executors' default_resource_dicts are missing
the "run_time_limit" key; update the default_resource_dict in both
SlurmClusterExecutor and SlurmJobExecutor to include "run_time_limit": None so
it matches FluxClusterExecutor and the documented API; locate the
default_resource_dict definitions inside the SlurmClusterExecutor and
SlurmJobExecutor classes and add the "run_time_limit" entry to each default
dict.
| if run_time_limit is not None: | ||
| command_prepend_lst += ["--time=" + str(run_time_limit // 60 + 1)] |
There was a problem hiding this comment.
--time conversion uses floor+1 instead of ceiling, wasting a minute for exact multiples of 60.
run_time_limit // 60 + 1 produces floor(seconds / 60) + 1, not ceil(seconds / 60). For exact multiples (e.g., run_time_limit=3600), this allocates 61 minutes instead of 60. Non-exact multiples (e.g., 3601 s) happen to produce the correct ceiling, so the formula is inconsistent.
🐛 Proposed fix using proper ceiling arithmetic
if run_time_limit is not None:
- command_prepend_lst += ["--time=" + str(run_time_limit // 60 + 1)]
+ command_prepend_lst += ["--time=" + str((run_time_limit + 59) // 60)]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/executorlib/standalone/command.py` around lines 164 - 165, The current
conversion of run_time_limit to minutes uses floor+1 and over-allocates for
exact multiples; change the calculation in the block that appends to
command_prepend_lst (the line building "--time=" from run_time_limit) to use a
proper ceiling: replace the expression run_time_limit // 60 + 1 with a correct
ceiling computation (e.g., use math.ceil(run_time_limit / 60) or integer math
(run_time_limit + 59) // 60) so exact multiples map to the exact minute count.
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #930 +/- ##
==========================================
+ Coverage 93.87% 93.89% +0.02%
==========================================
Files 38 38
Lines 1974 1981 +7
==========================================
+ Hits 1853 1860 +7
Misses 121 121 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unit/executor/test_flux_job.py`:
- Around line 120-136: In test_run_time_limit, remove the redundant assertion
AssertTrue(f2.done()) that follows f2.result() (since f2.result() already
guarantees completion) and to avoid potential flakiness replace the direct
AssertTrue(f1.done()) with a short bounded poll/wait that checks f1.done()
repeatedly (e.g., loop with small sleep and total timeout of a few seconds)
before asserting True; locate these in the test_run_time_limit function where f1
and f2 are submitted via FluxJobExecutor and delayed_calc is used.
In `@tests/unit/standalone/test_slurm_command.py`:
- Around line 58-62: The time rounding logic that builds the SLURM time option
currently uses run_time_limit // 60 + 1 which over-allocates when run_time_limit
is an exact multiple of 60; replace that expression with
math.ceil(run_time_limit / 60) (import math if not already imported) in the
function that constructs the SLURM command/time flag (the code path using the
run_time_limit variable and producing the "--time=" value) so that exact-minute
limits map to the correct minute value and partial minutes round up correctly.
| def test_run_time_limit(self): | ||
| with FluxJobExecutor( | ||
| max_cores=1, | ||
| resource_dict={"cores": 1}, | ||
| flux_executor=self.executor, | ||
| block_allocation=False, | ||
| pmi_mode=pmi, | ||
| ) as p: | ||
| f1 = p.submit(delayed_calc, 1, resource_dict={"run_time_limit": 1}) | ||
| f2 = p.submit(delayed_calc, 2, resource_dict={"run_time_limit": 5}) | ||
| self.assertFalse(f1.done()) | ||
| self.assertFalse(f2.done()) | ||
| self.assertEqual(f2.result(), 2) | ||
| self.assertTrue(f1.done()) | ||
| self.assertTrue(f2.done()) | ||
| with self.assertRaises(ExecutorlibSocketError): | ||
| f1.result() |
There was a problem hiding this comment.
Timing logic is sound; one redundant assertion and a minor flakiness risk to be aware of.
The core timing invariant holds in both sequential and parallel execution:
- Sequential (
max_cores=1gates f2 behind f1): f1 is killed at t≈1 s, f2 completes at t≈3 s → f1 has been dead for ~2 s whenf2.result()returns. - Parallel (if
block_allocation=Falsesubmits both Flux jobs independently): f1 is killed at t≈1 s, f2 completes at t≈2 s → f1 still dead beforef2.result()returns.
Two minor points:
-
Line 134 is redundant —
assertTrue(f2.done())is trivially true afterf2.result()already returned on line 132. Removing it would tighten the intent of the test. -
Potential flakiness at line 133 —
assertTrue(f1.done())assumes Flux's kill-signal propagation back to the Python future completes within the ~1–2 s window beforef2.result()returns. This is reasonable in practice, but if Flux's async notification path experiences transient latency the assertion can fail spuriously. Consider adding a short polling loop or a bounded wait only if this turns out to be flaky in CI.
♻️ Suggested cleanup (remove redundant assertion)
self.assertEqual(f2.result(), 2)
self.assertTrue(f1.done())
- self.assertTrue(f2.done())
with self.assertRaises(ExecutorlibSocketError):
f1.result()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/executor/test_flux_job.py` around lines 120 - 136, In
test_run_time_limit, remove the redundant assertion AssertTrue(f2.done()) that
follows f2.result() (since f2.result() already guarantees completion) and to
avoid potential flakiness replace the direct AssertTrue(f1.done()) with a short
bounded poll/wait that checks f1.done() repeatedly (e.g., loop with small sleep
and total timeout of a few seconds) before asserting True; locate these in the
test_run_time_limit function where f1 and f2 are submitted via FluxJobExecutor
and delayed_calc is used.
| run_time_limit=250, | ||
| ) | ||
| self.assertEqual(len(command_lst), 12) | ||
| reply_lst = ['srun', '-n', '1', '-D', '/tmp/test', '-N', '1', '--cpus-per-task=2', '--gpus-per-task=1', '--exact', '--oversubscribe', '--help'] | ||
| self.assertEqual(len(command_lst), 13) | ||
| reply_lst = ['srun', '-n', '1', '-D', '/tmp/test', '-N', '1', '--cpus-per-task=2', '--gpus-per-task=1', '--exact', '--oversubscribe', '--time=5', '--help'] | ||
| self.assertEqual(command_lst, reply_lst) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
ast-grep --pattern 'def generate_slurm_command($$$)'Repository: pyiron/executorlib
Length of output: 4450
Replace run_time_limit // 60 + 1 with ceiling division to prevent over-allocating time for jobs with exact minute boundaries.
The current implementation at line 165 of src/executorlib/standalone/command.py uses run_time_limit // 60 + 1, which always adds an extra minute. For jobs where run_time_limit is a multiple of 60 seconds (e.g., 60, 120, 180), this allocates unnecessary overhead: run_time_limit=60 becomes --time=2 instead of --time=1. Use math.ceil(run_time_limit / 60) instead to allocate exactly the minimum required time: ceiling ensures protection against under-allocation (e.g., 61 seconds → 2 minutes) without over-allocating when the limit is already a whole number of minutes.
🧰 Tools
🪛 Ruff (0.15.1)
[error] 61-61: Probable insecure usage of temporary file or directory: "/tmp/test"
(S108)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/standalone/test_slurm_command.py` around lines 58 - 62, The time
rounding logic that builds the SLURM time option currently uses run_time_limit
// 60 + 1 which over-allocates when run_time_limit is an exact multiple of 60;
replace that expression with math.ceil(run_time_limit / 60) (import math if not
already imported) in the function that constructs the SLURM command/time flag
(the code path using the run_time_limit variable and producing the "--time="
value) so that exact-minute limits map to the correct minute value and partial
minutes round up correctly.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/executorlib/task_scheduler/file/spawner_pysqa.py (1)
72-91:⚠️ Potential issue | 🔴 Critical
run_time_limitis never removed fromresource_dict, so it leaks intoqa.submit_job()as an unknown kwargEvery other key that is extracted and renamed follows a delete-after-use pattern:
cwdis deleted on line 75,threads_per_coreon line 78, and the unsupported keys on lines 84–86.run_time_limitis the only extracted key that is left inresource_dict. Becausesubmit_kwargs.update(resource_dict)runs at line 91, whenrun_time_limitis present it ends up being forwarded toqa.submit_job()as bothrun_time_max=<value>(correct) andrun_time_limit=<value>(unknown). Depending on pysqa'ssubmit_jobimplementation this will either raise aTypeErrorfor an unexpected keyword argument or silently pass an undefined template variable.🐛 Proposed fix — delete `run_time_limit` from `resource_dict` after extraction
submit_kwargs = { "command": " ".join(command), "dependency_list": [str(qid) for qid in task_dependent_lst], "working_directory": os.path.abspath(cwd), "run_time_max": resource_dict.get("run_time_limit"), } if "cwd" in resource_dict: del resource_dict["cwd"] + if "run_time_limit" in resource_dict: + del resource_dict["run_time_limit"] if "threads_per_core" in resource_dict:Alternatively, use
popat the point of extraction to keep it in one place:- submit_kwargs = { - "command": " ".join(command), - "dependency_list": [str(qid) for qid in task_dependent_lst], - "working_directory": os.path.abspath(cwd), - "run_time_max": resource_dict.get("run_time_limit"), - } + submit_kwargs = { + "command": " ".join(command), + "dependency_list": [str(qid) for qid in task_dependent_lst], + "working_directory": os.path.abspath(cwd), + "run_time_max": resource_dict.pop("run_time_limit", None), + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/executorlib/task_scheduler/file/spawner_pysqa.py` around lines 72 - 91, resource_dict still contains "run_time_limit" after you map it to submit_kwargs["run_time_max"], causing it to be forwarded to qa.submit_job() as an unexpected kwarg; remove or pop "run_time_limit" from resource_dict right after creating the "run_time_max" entry (the code manipulating resource_dict and submit_kwargs in this block, including the variables resource_dict, submit_kwargs and the later call qa.submit_job()) so submit_kwargs.update(resource_dict) won't reintroduce "run_time_limit".
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/executorlib/task_scheduler/file/spawner_pysqa.py`:
- Around line 72-91: resource_dict still contains "run_time_limit" after you map
it to submit_kwargs["run_time_max"], causing it to be forwarded to
qa.submit_job() as an unexpected kwarg; remove or pop "run_time_limit" from
resource_dict right after creating the "run_time_max" entry (the code
manipulating resource_dict and submit_kwargs in this block, including the
variables resource_dict, submit_kwargs and the later call qa.submit_job()) so
submit_kwargs.update(resource_dict) won't reintroduce "run_time_limit".
|
Let me test this in a minute - looks really cool |
Summary by CodeRabbit
New Features
Documentation
Tests