Skip to content

Commit

Permalink
avoid starting IOPub background thread after it's been stopped (#1137)
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk authored Sep 2, 2023
1 parent 44c1759 commit ca79e2e
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self, socket, pipe=False):
piped from subprocesses.
"""
self.socket = socket
self._stopped = False
self.background_socket = BackgroundSocket(self)
self._master_pid = os.getpid()
self._pipe_flag = pipe
Expand All @@ -83,13 +84,21 @@ def _start_event_gc():
self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc())

self.io_loop.run_sync(_start_event_gc)
self.io_loop.start()

if not self._stopped:
# avoid race if stop called before start thread gets here
# probably only comes up in tests
self.io_loop.start()

if self._event_pipe_gc_task is not None:
# cancel gc task to avoid pending task warnings
async def _cancel():
self._event_pipe_gc_task.cancel() # type:ignore

self.io_loop.run_sync(_cancel)
try:
self.io_loop.run_sync(_cancel)
except TimeoutError:
pass
self.io_loop.close(all_fds=True)

def _setup_event_pipe(self):
Expand Down Expand Up @@ -219,10 +228,16 @@ def start(self):

def stop(self):
"""Stop the IOPub thread"""
self._stopped = True
if not self.thread.is_alive():
return
self.io_loop.add_callback(self.io_loop.stop)
self.thread.join()

self.thread.join(timeout=30)
if self.thread.is_alive():
# avoid infinite hang if stop fails
msg = "IOPub thread did not terminate in 30 seconds"
raise TimeoutError(msg)
# close *all* event pipes, created in any thread
# event pipes can only be used from other threads while self.thread.is_alive()
# so after thread.join, this should be safe
Expand Down

0 comments on commit ca79e2e

Please sign in to comment.