Conversation
|
Warning Rate limit exceeded@jan-janssen has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 8 minutes and 14 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (2)
WalkthroughReplaces the single-entry worker API with two entry points: a queue-driven loop Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant T as Thread (Worker)
participant S as execute_multiple_tasks
participant Q as future_queue
participant IF as Interface
participant FN as Task fn
participant F as Future
T->>S: start(worker_id, kwargs…)
S->>IF: interface_bootup(spawner, commands, …)
alt init_function present
S->>IF: init_function()
end
loop until shutdown sentinel
S->>Q: get()
alt shutdown sentinel
S->>IF: shutdown(wait, join_queue)
S-->>T: return
else task_dict
S->>FN: execute (with/without cache)
FN-->>S: result / exception
S->>F: set_result / set_exception
S->>Q: task_done()
end
end
sequenceDiagram
autonumber
participant T as Thread (Worker)
participant S1 as execute_single_task
participant IF as Interface
participant FN as Task fn
participant F as Future
T->>S1: start(task_dict, kwargs…)
S1->>IF: interface_bootup(…)
alt init_function present
S1->>IF: init_function()
end
S1->>FN: execute (with/without cache)
FN-->>S1: result / exception
S1->>F: set_result / set_exception
S1->>IF: shutdown(wait)
S1-->>T: return
sequenceDiagram
autonumber
participant BA as BlockAllocationTaskScheduler
participant Th as Thread
participant S as execute_multiple_tasks
BA->>Th: Thread(target=S, kwargs=…)
Th->>S: run queue-driven worker
BA->>Th: scale up via max_workers.setter → new Thread(target=S)
sequenceDiagram
autonumber
participant O as OneToOneTaskScheduler
participant Th as Thread
participant S1 as execute_single_task
O->>Th: Thread(target=S1, kwargs={task_dict,…})
Th->>S1: run single-task worker (no qtask)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
tests/test_mpiexecspawner.py (2)
446-451: Leftover calls to removed execute_tasks will fail. Replace with execute_multiple_tasks.These tests will error at runtime.
- execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, spawner=MpiExecSpawner, )Repeat the same replacement at both remaining call sites.
Also applies to: 462-467, 538-544
446-447: Remove remainingexecute_taskscalls in tests
- tests/test_mpiexecspawner.py: lines 446–448
- tests/test_mpiexecspawner.py: lines 462–464
- tests/test_mpiexecspawner.py: lines 538–540
Replace or remove these per the updated API.
executorlib/task_scheduler/interactive/blockallocation.py (1)
91-97: Propagate worker_id when scaling up workersNew threads created during scale-up don’t receive a worker_id, unlike those from init. Given execute_multiple_tasks supports worker_id (used for logging/resource distribution), keep this consistent.
Apply:
- elif self._max_workers < max_workers: - new_process_lst = [ - Thread( - target=execute_multiple_tasks, - kwargs=self._process_kwargs, - ) - for _ in range(max_workers - self._max_workers) - ] + elif self._max_workers < max_workers: + start_id = len(self._process) + new_process_lst = [ + Thread( + target=execute_multiple_tasks, + kwargs=self._process_kwargs | {"worker_id": start_id + idx}, + name=f"BlockAllocWorker-{start_id + idx}", + ) + for idx in range(max_workers - self._max_workers) + ]executorlib/task_scheduler/interactive/onetoone.py (1)
200-209: Don’t override user-provided init_functionSetting init_function=None here discards any value supplied via executor_kwargs, which is likely unintended and a behavior change.
Apply:
task_kwargs.update( { - "task_dict": task_dict, - "spawner": spawner, - "hostname_localhost": hostname_localhost, - "init_function": None, + "task_dict": task_dict, + "spawner": spawner, + "hostname_localhost": hostname_localhost, } )
🧹 Nitpick comments (4)
executorlib/task_scheduler/interactive/shared.py (2)
105-128: Fix docstring: wrong entity and incorrect Args for execute_single_task.Use “single task” and document task_dict; drop queue_join_on_shutdown which isn’t an argument.
- """ - Execute a single tasks in parallel using the message passing interface (MPI). + """ + Execute a single task in parallel using the message passing interface (MPI). @@ - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + task_dict (dict): Dictionary describing the task submitted to the parallel process + {"fn": Callable, "args": (), "kwargs": {}, "future": Future, "resource_dict": {}} @@ - queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
31-54: Fix docstring: “single tasks” → “multiple tasks” and clarify caching default.Minor user-facing text accuracy.
- Execute a single tasks in parallel using the message passing interface (MPI). + Execute multiple tasks in parallel using the message passing interface (MPI). @@ - cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + cache_directory (str, optional): Directory to store cache files. If None, caching is disabled.executorlib/task_scheduler/interactive/blockallocation.py (1)
66-69: Name worker threads for easier debuggingOptional, but naming threads makes tracing/logging simpler.
Apply:
Thread( - target=execute_multiple_tasks, - kwargs=executor_kwargs | {"worker_id": worker_id}, + target=execute_multiple_tasks, + kwargs=executor_kwargs | {"worker_id": worker_id}, + name=f"BlockAllocWorker-{worker_id}", )executorlib/task_scheduler/interactive/onetoone.py (1)
210-213: Name the per-task threadA thread name helps trace single-task executions.
Apply:
- process = Thread( - target=execute_single_task, - kwargs=task_kwargs, - ) + process = Thread( + target=execute_single_task, + kwargs=task_kwargs, + name=f"OneToOneTask-{id(task_dict.get('future'))}", + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (7)
executorlib/task_scheduler/interactive/blockallocation.py(3 hunks)executorlib/task_scheduler/interactive/onetoone.py(2 hunks)executorlib/task_scheduler/interactive/shared.py(2 hunks)tests/test_fluxpythonspawner.py(3 hunks)tests/test_mpiexecspawner.py(5 hunks)tests/test_singlenodeexecutor_shell_executor.py(4 hunks)tests/test_singlenodeexecutor_shell_interactive.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
tests/test_singlenodeexecutor_shell_executor.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
execute_multiple_tasks(16-88)
tests/test_fluxpythonspawner.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
execute_multiple_tasks(16-88)
tests/test_singlenodeexecutor_shell_interactive.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
execute_multiple_tasks(16-88)
executorlib/task_scheduler/interactive/shared.py (3)
executorlib/standalone/interactive/spawner.py (2)
BaseSpawner(9-59)MpiExecSpawner(141-158)executorlib/standalone/interactive/communication.py (2)
interface_bootup(156-204)send_dict(46-57)executorlib/standalone/command.py (1)
get_interactive_execute_command(92-114)
tests/test_mpiexecspawner.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
execute_multiple_tasks(16-88)
executorlib/task_scheduler/interactive/blockallocation.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
execute_multiple_tasks(16-88)
executorlib/task_scheduler/interactive/onetoone.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
execute_single_task(91-152)
🔇 Additional comments (8)
executorlib/task_scheduler/interactive/shared.py (1)
68-89: LGTM: loop, sentinel handling, and per-task completion are clear.The multi-task loop correctly handles shutdown, optional queue join, and per-task execution/caching.
tests/test_singlenodeexecutor_shell_executor.py (1)
35-40: LGTM: tests migrated to execute_multiple_tasks.Rename is consistent; behavior unchanged.
Also applies to: 61-66, 89-93
tests/test_singlenodeexecutor_shell_interactive.py (1)
91-97: LGTM: execute_multiple_tasks with spawner and init_function.Covers the new API surface well.
tests/test_fluxpythonspawner.py (1)
115-121: LGTM: Flux tests updated to execute_multiple_tasks.Signature and kwargs remain correct.
Also applies to: 130-136
tests/test_mpiexecspawner.py (2)
13-14: LGTM: import updated to execute_multiple_tasks.
479-483: LGTM: renamed calls already correct in other tests here.Also applies to: 496-501, 519-526
executorlib/task_scheduler/interactive/blockallocation.py (1)
13-13: Switch to execute_multiple_tasks: LGTMImport aligns with the new API. No issues spotted.
executorlib/task_scheduler/interactive/onetoone.py (1)
7-7: Switch to execute_single_task: LGTMImport matches the refactor intent.
| if error_log_file is not None: | ||
| task_dict["error_log_file"] = error_log_file | ||
| if cache_directory is None: | ||
| _execute_task_without_cache(interface=interface, task_dict=task_dict) | ||
| else: | ||
| _execute_task_with_cache( | ||
| interface=interface, | ||
| task_dict=task_dict, | ||
| cache_directory=cache_directory, | ||
| cache_key=cache_key, | ||
| ) | ||
|
|
There was a problem hiding this comment.
execute_single_task leaks the interface; always shutdown in success path.
If no exception occurs, the interface process remains running. Ensure shutdown in a finally block.
- if error_log_file is not None:
- task_dict["error_log_file"] = error_log_file
- if cache_directory is None:
- _execute_task_without_cache(interface=interface, task_dict=task_dict)
- else:
- _execute_task_with_cache(
- interface=interface,
- task_dict=task_dict,
- cache_directory=cache_directory,
- cache_key=cache_key,
- )
+ try:
+ if error_log_file is not None:
+ task_dict["error_log_file"] = error_log_file
+ if cache_directory is None:
+ _execute_task_without_cache(interface=interface, task_dict=task_dict)
+ else:
+ _execute_task_with_cache(
+ interface=interface,
+ task_dict=task_dict,
+ cache_directory=cache_directory,
+ cache_key=cache_key,
+ )
+ finally:
+ # Be robust if helpers already shut down on exception
+ with contextlib.suppress(Exception):
+ interface.shutdown(wait=True)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if error_log_file is not None: | |
| task_dict["error_log_file"] = error_log_file | |
| if cache_directory is None: | |
| _execute_task_without_cache(interface=interface, task_dict=task_dict) | |
| else: | |
| _execute_task_with_cache( | |
| interface=interface, | |
| task_dict=task_dict, | |
| cache_directory=cache_directory, | |
| cache_key=cache_key, | |
| ) | |
| try: | |
| if error_log_file is not None: | |
| task_dict["error_log_file"] = error_log_file | |
| if cache_directory is None: | |
| _execute_task_without_cache(interface=interface, task_dict=task_dict) | |
| else: | |
| _execute_task_with_cache( | |
| interface=interface, | |
| task_dict=task_dict, | |
| cache_directory=cache_directory, | |
| cache_key=cache_key, | |
| ) | |
| finally: | |
| # Be robust if helpers already shut down on exception | |
| with contextlib.suppress(Exception): | |
| interface.shutdown(wait=True) |
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/shared.py around lines 142 to 153, the
code calls either _execute_task_without_cache or _execute_task_with_cache but
does not stop the interface on the success path, leaking the interface process;
wrap the execution calls in a try/finally so that interface.shutdown() (or the
appropriate stop/close method on the interface) is always invoked after the task
completes or raises, preserving the existing error_log_file and cache logic and
ensuring shutdown occurs in both success and exception cases.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #796 +/- ##
==========================================
- Coverage 97.75% 97.75% -0.01%
==========================================
Files 33 33
Lines 1468 1467 -1
==========================================
- Hits 1435 1434 -1
Misses 33 33 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
tests/test_mpiexecspawner.py (2)
264-270: Avoid double-joining the queue.execute_multiple_tasks already joins the queue by default (queue_join_on_shutdown=True). Either disable that here or drop the explicit q.join() later to shave test time. Same nit applies to the other updated calls in this file.
Option A (disable internal join):
execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, spawner=MpiExecSpawner, init_function=set_global, + queue_join_on_shutdown=False, )Option B (keep default, remove explicit join later):
- q.join() + # q.join() not needed; execute_multiple_tasks already joined the queue.
478-483: Consider adding a targeted test for execute_single_task.Since the PR introduces execute_single_task, add a minimal happy-path test to pin its behavior (e.g., same calc_array(i=2) flow) to guard future regressions.
I can draft a small test method mirroring this case using execute_single_task if you want.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
tests/test_mpiexecspawner.py(8 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_mpiexecspawner.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
execute_multiple_tasks(16-88)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-24.04-arm, 3.13)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_slurm_mpich
- GitHub Check: notebooks_integration
- GitHub Check: unittest_mpich (ubuntu-24.04-arm, 3.13)
- GitHub Check: unittest_flux_mpich
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: unittest_win
🔇 Additional comments (6)
tests/test_mpiexecspawner.py (6)
446-451: Failure-path test updated to new API is correct.Covers missing required arg, exception propagation via Future remains intact.
462-467: Wrong-argument failure-path test looks good.Validates TypeError surfacing through Future with the renamed runner.
519-525: Cache-mode success-path updated correctly.Matches new API and uses tearDown to clean the cache dir.
538-544: Cache-mode failure-path test looks good.Exercises error propagation with caching enabled against the renamed entry point.
13-13: Approve changes — no lingeringexecute_tasksreferences found.
496-501: No action needed—openmpi_oversubscribe is correctly forwarded and applied MpiExecSpawner’s command builder (generate_mpiexec_command) still checksopenmpi_oversubscribeand adds--oversubscribewhen true.
Summary by CodeRabbit
New Features
Refactor
Tests