Interactive: Interrupt interface bootup when the executor is shutdown during bootup#801
Interactive: Interrupt interface bootup when the executor is shutdown during bootup#801jan-janssen merged 36 commits intomainfrom
Conversation
for more information, see https://pre-commit.ci
|
Caution Review failedThe pull request is closed. WalkthroughAdds stoppable boot semantics and boot-status tracking to interactive SocketInterface and spawners; propagates new optional stop_function through interface_bootup and spawner.bootup; changes task submission helpers to return boolean for transient socket errors, adds task reset/retry flow and per-instance boot interruption in block allocation; updates tests and docstrings accordingly. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Caller
participant Scheduler
participant Interface as SocketInterface
participant Spawner
note over Scheduler,Interface: bootup with optional stop_function
Caller->>Scheduler: interface_bootup(command_lst, ..., stop_function)
Scheduler->>Interface: bootup(command_lst, stop_function)
Interface->>Spawner: bootup(command_lst, stop_function)
Spawner-->>Interface: bool (success?)
alt success
Interface-->>Scheduler: status=True (persisted cmd/stop)
Scheduler-->>Caller: Interface
else failure
Interface-->>Scheduler: status=False (socket reset)
Scheduler-->>Caller: Interface (status=False)
end
sequenceDiagram
autonumber
actor Producer
participant BlockScheduler as BlockAllocationWorker
participant Shared as shared.execute_task_dict
participant Interface as SocketInterface
participant Queue as future_queue
Producer->>BlockScheduler: submit task_dict + future
BlockScheduler->>Interface: ensure init (if needed)
BlockScheduler->>Shared: execute_task_dict(task_dict, future, Interface, ...)
alt submission succeeded
Shared-->>BlockScheduler: True
BlockScheduler->>Queue: task_done()
else transient socket error
Shared-->>BlockScheduler: False
BlockScheduler->>Shared: reset_task_dict(future, Queue, task_dict)
opt restart allowed
BlockScheduler->>Interface: bootup(stored_cmd, stored_stop)
Interface-->>BlockScheduler: status(bool)
end
end
note over BlockScheduler: on shutdown set interrupt flag before tearing down interface
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 Ruff (0.12.2)executorlib/executor/flux.pyruff: error while loading shared libraries: libpthread.so.0: cannot open shared object file: No such file or directory executorlib/executor/single.pyruff: error while loading shared libraries: libpthread.so.0: cannot open shared object file: No such file or directory tests/test_standalone_interactive_communication.pyruff: error while loading shared libraries: libpthread.so.0: cannot open shared object file: No such file or directory
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (10)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #801 +/- ##
==========================================
+ Coverage 97.76% 97.84% +0.07%
==========================================
Files 32 32
Lines 1479 1530 +51
==========================================
+ Hits 1446 1497 +51
Misses 33 33 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
executorlib/standalone/interactive/spawner.py (1)
128-141: shutdown(wait=False) still blocks — communicate() ignores wait and can hang.communicate() is unnecessary here (stdout/stderr aren’t piped) and causes wait=False to block. Respect the wait flag and add a safe kill fallback.
- if self._process is not None: - self._process.communicate() - self._process.terminate() - if wait: - self._process.wait() - self._process = None + if self._process is not None: + try: + if wait: + self._process.terminate() + self._process.wait() + else: + self._process.terminate() + finally: + self._process = Noneexecutorlib/standalone/interactive/communication.py (1)
156-163: Guard against None socket after _reset_socket to avoid AttributeError.shutdown() calls send_and_receive_dict unconditionally when the process is alive; this crashes when the socket was reset.
- if self._spawner.poll(): + if self._spawner.poll() and self._socket is not None: result = self.send_and_receive_dict( input_dict={"shutdown": True, "wait": wait} ) self._spawner.shutdown(wait=wait)executorlib/task_scheduler/interactive/blockallocation.py (1)
106-115: New workers don’t receive stop_function/worker_id.Scale-up path drops the interrupt semantics. Pass both, mirroring init.
- new_process_lst = [ - Thread( - target=_execute_multiple_tasks, - kwargs=self._process_kwargs, - ) - for _ in range(max_workers - self._max_workers) - ] + new_process_lst = [] + for worker_id in range(self._max_workers, max_workers): + new_process_lst.append( + Thread( + target=_execute_multiple_tasks, + kwargs=self._process_kwargs + | { + "worker_id": worker_id, + "stop_function": lambda: _interrupt_bootup_dict[self._self_id], + }, + ) + )
🧹 Nitpick comments (9)
executorlib/standalone/interactive/spawner.py (1)
29-34: Define precise stop_function type and intent.Use a callable signature to reflect the intended predicate and tighten the contract in both signature and docs.
- def bootup( - self, - command_lst: list[str], - stop_function: Optional[Callable] = None, - ) -> bool: + def bootup( + self, + command_lst: list[str], + stop_function: Optional[Callable[[], bool]] = None, + ) -> bool: @@ - stop_function (Callable): Function to stop the interface. + stop_function (Callable[[], bool], optional): Predicate returning True if boot should be aborted.Also applies to: 39-43
executorlib/task_scheduler/interactive/onetoone.py (1)
233-261: Thread target’s return value is ignored — bool result is misleading.Thread doesn’t capture the returned bool; callers can’t observe it. Either remove the return type/statement or persist the flag via a queue/callback.
-def _execute_task_in_thread(..., **kwargs,) -> bool: +def _execute_task_in_thread(..., **kwargs,) -> None: @@ - return execute_task_dict( + _ = execute_task_dict( ... )If you need the flag, inject a results queue and put() it there.
executorlib/standalone/interactive/communication.py (2)
169-176: Also clear the poller when resetting to avoid stale registrations.- if self._socket is not None: + if self._socket is not None: self._socket.close() - if self._context is not None: + if self._context is not None: self._context.term() self._process = None self._socket = None self._context = None + self._poller = None
212-214: Docstring return type is outdated.interface_bootup now returns Optional[SocketInterface].
- Returns: - executorlib.shared.communication.SocketInterface: socket interface for zmq communication + Returns: + Optional[SocketInterface]: socket interface for ZMQ communication if bootup succeeds, otherwise None.executorlib/task_scheduler/interactive/blockallocation.py (4)
21-21: Global interrupt dict needs cleanup to prevent stale entries.Delete the per-instance key during shutdown after threads join.
172-179: Cleanup interrupt flag on full shutdown.Avoid leaking keys after threads stop.
if wait: for process in self._process: process.join() cancel_items_in_queue(que=self._future_queue) self._future_queue.join() + # Clear interrupt flag for this instance + _interrupt_bootup_dict.pop(self._self_id, None)
234-235: Doc clarity: specify predicate signature for stop_function.- stop_function (Callable): Function to stop the interface. + stop_function (Callable[[], bool]): Predicate returning True if boot should be aborted.
269-279: Avoid double task_done() calls on failure path.reset_task_dict already marks the current task done; calling task_done() beforehand is redundant (currently suppressed) and obscures intent.
- if not result_flag: - task_done(future_queue=future_queue) - reset_task_dict( + if not result_flag: + reset_task_dict( future_obj=f, future_queue=future_queue, task_dict=task_dict ) if interface is not None: - interface.restart() + _ = interface.restart() else: break else: task_done(future_queue=future_queue)executorlib/task_scheduler/interactive/shared.py (1)
73-89: Docstring claims a bool return but function returns None.Align docs with behavior.
def reset_task_dict(future_obj: Future, future_queue: queue.Queue, task_dict: dict): @@ - Returns: - bool: True if the task was submitted successfully, False otherwise. + Returns: + None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
executorlib/standalone/interactive/communication.py(7 hunks)executorlib/standalone/interactive/spawner.py(4 hunks)executorlib/task_scheduler/interactive/blockallocation.py(5 hunks)executorlib/task_scheduler/interactive/onetoone.py(3 hunks)executorlib/task_scheduler/interactive/shared.py(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
executorlib/task_scheduler/interactive/onetoone.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
execute_task_dict(16-59)
executorlib/standalone/interactive/communication.py (1)
executorlib/standalone/interactive/spawner.py (2)
bootup(29-44)bootup(92-114)
executorlib/task_scheduler/interactive/blockallocation.py (4)
executorlib/task_scheduler/interactive/shared.py (3)
execute_task_dict(16-59)reset_task_dict(73-88)task_done(62-70)executorlib/task_scheduler/base.py (3)
_set_process(171-179)future_queue(63-70)shutdown(146-169)executorlib/standalone/queue.py (1)
cancel_items_in_queue(4-19)executorlib/standalone/interactive/communication.py (3)
send_dict(48-59)shutdown(149-163)restart(134-147)
executorlib/task_scheduler/interactive/shared.py (2)
executorlib/standalone/interactive/communication.py (4)
ExecutorlibSocketError(10-11)SocketInterface(14-182)send_and_receive_dict(84-96)shutdown(149-163)executorlib/standalone/hdf.py (1)
get_output(71-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: notebooks_integration
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: unittest_flux_mpich
- GitHub Check: unittest_flux_openmpi
- GitHub Check: notebooks
🔇 Additional comments (6)
executorlib/standalone/interactive/spawner.py (1)
142-149: LGTM: poll() semantics are clear and correct.Returns True only while the process is alive.
executorlib/task_scheduler/interactive/onetoone.py (1)
262-277: Boot failure path may leave futures pending (interface can be None).execute_task_dict will return False when interface is None, but in the OneProcess flow it’s not re-queued/reset like in BlockAllocation. Please verify this path doesn’t strand futures.
executorlib/task_scheduler/interactive/blockallocation.py (2)
72-75: Per-instance interrupt flag initialization looks good.Capturing self_id avoids cross-instance clashes.
76-84: Nice: propagate stop_function to workers.Ensures workers can observe shutdown intent during boot.
executorlib/task_scheduler/interactive/shared.py (2)
6-6: Private API usage (PENDING) — verify compatibility.Future._state and PENDING are internal; confirm your supported Python versions, or consider an alternative reset strategy if this changes upstream.
106-115: Error handling semantics are solid.ExecutorlibSocketError triggers non-fatal False (for retry), other errors set exception and cleanly shutdown.
| self._command_lst = command_lst | ||
| self._stop_function = stop_function | ||
| if not self._spawner.bootup( | ||
| command_lst=command_lst, | ||
| ) | ||
| stop_function=stop_function, | ||
| ): | ||
| self._reset_socket() | ||
| return False | ||
| else: | ||
| return True | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Honor stop_function at the interface layer as well.
Early-abort here too so we don’t even delegate to the spawner when a shutdown is in progress.
- self._command_lst = command_lst
- self._stop_function = stop_function
+ self._command_lst = command_lst
+ self._stop_function = stop_function
+ if stop_function and stop_function():
+ return False
if not self._spawner.bootup(
command_lst=command_lst,
stop_function=stop_function,
):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| self._command_lst = command_lst | |
| self._stop_function = stop_function | |
| if not self._spawner.bootup( | |
| command_lst=command_lst, | |
| ) | |
| stop_function=stop_function, | |
| ): | |
| self._reset_socket() | |
| return False | |
| else: | |
| return True | |
| self._command_lst = command_lst | |
| self._stop_function = stop_function | |
| if stop_function and stop_function(): | |
| return False | |
| if not self._spawner.bootup( | |
| command_lst=command_lst, | |
| stop_function=stop_function, | |
| ): | |
| self._reset_socket() | |
| return False | |
| else: | |
| return True |
🤖 Prompt for AI Agents
In executorlib/standalone/interactive/communication.py around lines 123 to 133,
the method currently always delegates to the spawner even if a shutdown is
already in progress; call the provided stop_function() immediately after
assigning self._stop_function (and before calling self._spawner.bootup), and if
it indicates shutdown (truthy), call self._reset_socket() and return False so we
avoid delegating to the spawner; otherwise proceed to call self._spawner.bootup
as before.
| if not self._spawner.bootup( | ||
| command_lst=self._command_lst, | ||
| stop_function=self._stop_function, | ||
| ): | ||
| self._reset_socket() | ||
| return False | ||
| return True |
There was a problem hiding this comment.
🛠️ Refactor suggestion
restart() can’t recover after _reset_socket — reinit socket and port.
If _reset_socket was called earlier, _context/_socket are None; re-create them, bind a new port, and update the stored command list before spawning.
- if not self._spawner.bootup(
+ # Reinitialize socket when it was reset previously.
+ if self._socket is None or self._context is None:
+ self._context = zmq.Context()
+ self._socket = self._context.socket(zmq.PAIR)
+ self._poller = zmq.Poller()
+ self._poller.register(self._socket, zmq.POLLIN)
+ port = self.bind_to_random_port()
+ try:
+ idx = self._command_lst.index("--zmqport")
+ self._command_lst[idx + 1] = str(port)
+ except ValueError:
+ self._command_lst += ["--zmqport", str(port)]
+ if not self._spawner.bootup(
command_lst=self._command_lst,
stop_function=self._stop_function,
):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if not self._spawner.bootup( | |
| command_lst=self._command_lst, | |
| stop_function=self._stop_function, | |
| ): | |
| self._reset_socket() | |
| return False | |
| return True | |
| # Reinitialize socket when it was reset previously. | |
| if self._socket is None or self._context is None: | |
| self._context = zmq.Context() | |
| self._socket = self._context.socket(zmq.PAIR) | |
| self._poller = zmq.Poller() | |
| self._poller.register(self._socket, zmq.POLLIN) | |
| port = self.bind_to_random_port() | |
| try: | |
| idx = self._command_lst.index("--zmqport") | |
| self._command_lst[idx + 1] = str(port) | |
| except ValueError: | |
| self._command_lst += ["--zmqport", str(port)] | |
| if not self._spawner.bootup( | |
| command_lst=self._command_lst, | |
| stop_function=self._stop_function, | |
| ): | |
| self._reset_socket() | |
| return False | |
| return True |
🤖 Prompt for AI Agents
In executorlib/standalone/interactive/communication.py around lines 141-147,
restart() currently assumes _context and _socket exist after a prior
_reset_socket call; modify restart() to detect if _context or _socket is None
and, when so, recreate the ZMQ context and socket, bind the socket to a new
ephemeral/available port, update the stored command list to include the new
port/address as required, and only then call self._spawner.bootup(...); ensure
_reset_socket clears references so this re-initialization always runs before
spawning and return False on any failure to reinit.
| def bootup( | ||
| self, | ||
| command_lst: list[str], | ||
| ): | ||
| stop_function: Optional[Callable] = None, | ||
| ) -> bool: | ||
| """ |
There was a problem hiding this comment.
Boot-interrupt is currently a no-op — stop_function is accepted but never used.
Without consulting stop_function, shutdown during bootup won’t be interrupted. Short-circuit before spawn, and abort immediately if the signal flips after spawn.
def bootup(
self,
command_lst: list[str],
- stop_function: Optional[Callable] = None,
+ stop_function: Optional[Callable[[], bool]] = None,
) -> bool:
@@
- if self._cwd is not None:
+ if stop_function and stop_function():
+ return False
+ if self._cwd is not None:
os.makedirs(self._cwd, exist_ok=True)
self._process = subprocess.Popen(
args=self.generate_command(command_lst=command_lst),
cwd=self._cwd,
stdin=subprocess.DEVNULL,
)
- return self.poll()
+ # Abort immediately if a stop is requested during/after spawn.
+ if stop_function and stop_function():
+ self.shutdown(wait=False)
+ return False
+ return self.poll()Also applies to: 100-106, 114-114
🤖 Prompt for AI Agents
In executorlib/standalone/interactive/spawner.py around lines 92-97 (also check
100-106 and 114), the bootup method accepts stop_function but never calls it, so
shutdown signals during boot are ignored; modify bootup to check stop_function
immediately before starting the spawn and again right after spawn (and at any
other blocking points) and abort/return False if stop_function indicates a stop,
ensuring you short-circuit before launching the process and after spawn to clean
up; also propagate this check into the logic at lines 100-106 and 114 so any
in-progress boot can be interrupted and resources released.
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
a49e02e to
2a472b5
Compare
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests