Interactive: Separate future and taskdict#800
Conversation
WalkthroughRefactors interactive schedulers to extract the Future from task_dict and pass it explicitly as a new future_obj parameter to execute_task_dict and its helpers; updates callers (onetoone, blockallocation) to pop and propagate the Future and to track active tasks by Future identity. All result/exception propagation uses the provided Future. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant P as Producer
participant S as Scheduler (onetoone/blockallocation)
participant T as Worker Thread
participant X as execute_task_dict (shared)
participant C as Cache Helpers
participant F as Future
P->>S: submit(task_dict {fn, future})
S->>S: f = task_dict.pop("future")
S->>S: active_task_dict[f] = task_dict
S->>T: start _execute_task_in_thread(task_dict, future_obj=f)
T->>X: execute_task_dict(task_dict, future_obj=f, interface, ...)
alt cache enabled
X->>C: _execute_task_with_cache(..., future_obj=f, ...)
C->>F: set_running_or_notify_cancel()
C-->>F: set_result(...) / set_exception(...)
else no cache
X->>C: _execute_task_without_cache(..., future_obj=f)
C->>F: set_running_or_notify_cancel()
C-->>F: set_result(...) / set_exception(...)
end
X-->>T: done
T-->>S: task_done
S->>S: del active_task_dict[f]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
for more information, see https://pre-commit.ci
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #800 +/- ##
=======================================
Coverage 97.75% 97.75%
=======================================
Files 32 32
Lines 1468 1468
=======================================
Hits 1435 1435
Misses 33 33 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
executorlib/task_scheduler/interactive/blockallocation.py (1)
243-253: Ensure queue.task_done is always calledIf execute_task_dict unexpectedly raises, the queue item would never be marked done, risking deadlock on join(). Wrap in try/finally.
- f = task_dict.pop("future") - execute_task_dict( - task_dict=task_dict, - future_obj=f, - interface=interface, - cache_directory=cache_directory, - cache_key=cache_key, - error_log_file=error_log_file, - ) - task_done(future_queue=future_queue) + f = task_dict.pop("future") + try: + execute_task_dict( + task_dict=task_dict, + future_obj=f, + interface=interface, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) + finally: + task_done(future_queue=future_queue)
🧹 Nitpick comments (2)
executorlib/task_scheduler/interactive/shared.py (1)
73-76: Unify Future state transition (minor)The explicit not future_obj.done() guard is redundant; set_running_or_notify_cancel() already handles done/cancelled states. Consider simplifying for consistency with the cache path.
- if not future_obj.done() and future_obj.set_running_or_notify_cancel(): + if future_obj.set_running_or_notify_cancel():executorlib/task_scheduler/interactive/onetoone.py (1)
259-274: Potential process leak: interface not shut down after single-task execution_in execute_task_in_thread, a new interface is created per call but never explicitly shut down on success. If the remote process persists, this leaks resources. Consider capturing the interface and shutting it down in a finally block. If the design intends a persistent one-shot child that auto-exits, please ignore after confirming.
- execute_task_dict( - task_dict=task_dict, - future_obj=future_obj, - interface=interface_bootup( - command_lst=get_interactive_execute_command( - cores=cores, - ), - connections=spawner(cores=cores, **kwargs), - hostname_localhost=hostname_localhost, - log_obj_size=log_obj_size, - worker_id=worker_id, - ), - cache_directory=cache_directory, - cache_key=cache_key, - error_log_file=error_log_file, - ) + iface = interface_bootup( + command_lst=get_interactive_execute_command(cores=cores), + connections=spawner(cores=cores, **kwargs), + hostname_localhost=hostname_localhost, + log_obj_size=log_obj_size, + worker_id=worker_id, + ) + try: + execute_task_dict( + task_dict=task_dict, + future_obj=future_obj, + interface=iface, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) + finally: + # Safe even if already closed inside execute_task_dict on error + iface.shutdown(wait=True)If helpful, I can add a short test/diagnostic to assert no lingering child processes after a simple submit.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
executorlib/task_scheduler/interactive/blockallocation.py(1 hunks)executorlib/task_scheduler/interactive/onetoone.py(6 hunks)executorlib/task_scheduler/interactive/shared.py(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
executorlib/task_scheduler/interactive/blockallocation.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
execute_task_dict(12-47)
executorlib/task_scheduler/interactive/shared.py (3)
executorlib/task_scheduler/base.py (2)
future_queue(63-70)shutdown(146-169)executorlib/standalone/interactive/communication.py (3)
SocketInterface(14-153)send_and_receive_dict(82-94)shutdown(120-134)executorlib/standalone/hdf.py (2)
dump(20-37)get_output(71-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: unittest_flux_mpich
- GitHub Check: notebooks_integration
- GitHub Check: unittest_slurm_mpich
- GitHub Check: notebooks
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_old
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: unittest_win
🔇 Additional comments (3)
executorlib/task_scheduler/interactive/shared.py (2)
51-56: Docstring addition looks goodThanks for documenting task_done; clear and accurate.
37-47: API threading of future_obj is correctPassing future_obj through both code paths is consistent and maintains external ownership.
executorlib/task_scheduler/interactive/onetoone.py (1)
190-211: Future propagation and active task tracking look correctPopping future, using it as the active_task_dict key, and threading via future_obj is coherent.
| else: | ||
| _, _, result = get_output(file_name=file_name) | ||
| future = task_dict["future"] | ||
| future.set_result(result) | ||
| future_obj.set_result(result) |
There was a problem hiding this comment.
Cache-hit path ignores cancellation; may raise InvalidStateError
When the result is served from cache, the Future is completed unconditionally. If the Future was cancelled/prematurely completed, this can raise InvalidStateError and violate expected cancellation semantics. Mirror the non-cache path by honoring set_running_or_notify_cancel().
Apply:
- else:
- _, _, result = get_output(file_name=file_name)
- future_obj.set_result(result)
+ else:
+ _, _, result = get_output(file_name=file_name)
+ if future_obj.set_running_or_notify_cancel():
+ future_obj.set_result(result)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| else: | |
| _, _, result = get_output(file_name=file_name) | |
| future = task_dict["future"] | |
| future.set_result(result) | |
| future_obj.set_result(result) | |
| else: | |
| _, _, result = get_output(file_name=file_name) | |
| if future_obj.set_running_or_notify_cancel(): | |
| future_obj.set_result(result) |
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/shared.py around lines 122 to 124, the
cache-hit branch unconditionally calls future_obj.set_result(result) which can
raise InvalidStateError if the Future was cancelled; mirror the non-cache path
by first calling future_obj.set_running_or_notify_cancel() and if it returns
False stop/return without setting the result, otherwise call set_result(result)
(optionally catch InvalidStateError as a safeguard).
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (1)
executorlib/task_scheduler/interactive/shared.py (1)
121-124: Cache-hit bug: propagate cached errors via set_exception; cancellation concern from earlier review is now mootThe previous comment about honoring cancellation in the cache-hit branch is addressed by gating at the entrypoint with
set_running_or_notify_cancel(). However, a separate issue remains:get_output()returns a success flag; when it’s False (cached error), you currently callset_result(result), which would incorrectly mark the Future successful with an exception object.Apply:
- else: - _, _, result = get_output(file_name=file_name) - future_obj.set_result(result) + else: + try: + _found, _success, _obj = get_output(file_name=file_name) + if _success: + future_obj.set_result(_obj) + else: + # Cached execution resulted in an error; propagate as exception + future_obj.set_exception(_obj if isinstance(_obj, BaseException) else RuntimeError(f"Cached error: {_obj!r}")) + except Exception as e: + # Reading cache failed; surface the failure instead of silently succeeding + future_obj.set_exception(e)
🧹 Nitpick comments (2)
executorlib/task_scheduler/interactive/shared.py (2)
52-57: Docstring/typing nit fortask_doneTighten wording and the parameter type; current text mentions “task dictionaries”.
Apply:
- """ - Mark the current task as done in the current queue. - - Args: - future_queue (queue): Queue of task dictionaries waiting for execution. - """ + """ + Mark the current task as done on the given queue. + + Args: + future_queue (queue.Queue): Work queue whose current task has finished. + """
111-120: Use high-resolution clock for runtime measurement (minor)Prefer
time.perf_counter()for durations.Apply:
- time_start = time.time() + time_start = time.perf_counter() @@ - data_dict["runtime"] = time.time() - time_start + data_dict["runtime"] = time.perf_counter() - time_start
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
executorlib/task_scheduler/interactive/shared.py(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
executorlib/task_scheduler/interactive/shared.py (2)
executorlib/standalone/interactive/communication.py (3)
SocketInterface(14-153)send_and_receive_dict(82-94)shutdown(120-134)executorlib/standalone/hdf.py (2)
dump(20-37)get_output(71-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: unittest_openmpi (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: notebooks_integration
- GitHub Check: unittest_openmpi (ubuntu-24.04-arm, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: notebooks
- GitHub Check: unittest_mpich (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-24.04-arm, 3.13)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: unittest_win
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
🔇 Additional comments (4)
executorlib/task_scheduler/interactive/shared.py (4)
34-48: Cancellation respected before dispatch — LGTMThe entrypoint correctly gates execution with
done()andset_running_or_notify_cancel().
74-79: Non-cache path error handling — LGTMShutting down the interface on exception and completing the Future via
set_exceptionis appropriate.
12-19: Allexecute_task_dictinvocations updated to includefuture_obj; no stale or external helper calls remain.
35-36: Cache key excludeserror_log_file
serialize_functonly serializesfn_args,fn_kwargs,resource_dict(and an optionalcache_key) when computing the task key, so addingerror_log_filetotask_dictwon’t fragment the cache.
Summary by CodeRabbit