From 92f979bce4582e807facb1c274a962b3caf0d2eb Mon Sep 17 00:00:00 2001 From: Alan Jenkins Date: Thu, 22 Oct 2020 12:28:11 +0100 Subject: [PATCH] bpo-42110: prototype fix for MultiLoopChildWatcher --- Lib/asyncio/unix_events.py | 145 ++++++++++++++--------- Lib/test/test_asyncio/test_subprocess.py | 19 +++ 2 files changed, 108 insertions(+), 56 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index f34a5b4b443736..70bbee662caff1 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1202,6 +1202,79 @@ def _do_waitpid_all(self): callback(pid, returncode, *args) +# Internal helper for MultiLoopChildWatcher. +# Manage the child handlers for a single event loop. +# So all accesses to this are from the same thread. +class _LoopChildWatcher: + def __init__(self, loop): + self._loop = loop + self._callbacks = {} + + def add_child_handler(self, pid, callback, *args): + self._callbacks[pid] = (callback, args) + + # Prevent a race condition in case the child is already terminated. + self._do_waitpid(pid) + + def remove_child_handler(self, pid): + try: + del self._callbacks[pid] + return True + except KeyError: + return False + + def empty(self): + return not self._callbacks + + def _do_waitpid(self, expected_pid): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, os.WNOHANG) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + debug_log = False + else: + if pid == 0: + # The child process is still alive. + return + + returncode = _compute_returncode(status) + debug_log = True + try: + callback, args = self._callbacks.pop(pid) + except KeyError: # pragma: no cover + # May happen if .remove_child_handler() is called + # after os.waitpid() returns. + logger.warning("Child watcher got an unexpected pid: %r", + pid, exc_info=True) + else: + if debug_log and self._loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) + self._loop.call_soon(callback, pid, returncode, *args) + + def do_waitpid_all(self): + try: + for pid in list(self._callbacks): + self._do_waitpid(pid) + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: + # self._loop should always be available here + # as we are only called via loop.call_soon_threadsafe() + self._loop.call_exception_handler({ + 'message': 'Unknown exception in SIGCHLD handler', + 'exception': exc, + }) + + class MultiLoopChildWatcher(AbstractChildWatcher): """A watcher that doesn't require running loop in the main thread. @@ -1222,14 +1295,14 @@ class MultiLoopChildWatcher(AbstractChildWatcher): # but retrieves the current loop by get_running_loop() def __init__(self): - self._callbacks = {} + self._loops = {} # event loop -> _LoopChildWatcher self._saved_sighandler = None def is_active(self): return self._saved_sighandler is not None def close(self): - self._callbacks.clear() + self._loops.clear() if self._saved_sighandler is not None: handler = signal.getsignal(signal.SIGCHLD) if handler != self._sig_chld: @@ -1246,17 +1319,18 @@ def __exit__(self, exc_type, exc_val, exc_tb): def add_child_handler(self, pid, callback, *args): loop = events.get_running_loop() - self._callbacks[pid] = (loop, callback, args) - - # Prevent a race condition in case the child is already terminated. - self._do_waitpid(pid) + if not loop in self._loops: + self._loops[loop] = _LoopChildWatcher(loop) + watcher = self._loops[loop] + watcher.add_child_handler(pid, callback, *args) def remove_child_handler(self, pid): - try: - del self._callbacks[pid] - return True - except KeyError: + if not loop in self._loops: return False + watcher = self._loops[loop] + ret = watcher.remove_child_handler(pid) + if watcher.empty(): + del self._loops[loop] def attach_loop(self, loop): # Don't save the loop but initialize itself if called first time @@ -1273,54 +1347,13 @@ def attach_loop(self, loop): # Set SA_RESTART to limit EINTR occurrences. signal.siginterrupt(signal.SIGCHLD, False) - def _do_waitpid_all(self): - for pid in list(self._callbacks): - self._do_waitpid(pid) - - def _do_waitpid(self, expected_pid): - assert expected_pid > 0 - - try: - pid, status = os.waitpid(expected_pid, os.WNOHANG) - except ChildProcessError: - # The child process is already reaped - # (may happen if waitpid() is called elsewhere). - pid = expected_pid - returncode = 255 - logger.warning( - "Unknown child process pid %d, will report returncode 255", - pid) - debug_log = False - else: - if pid == 0: - # The child process is still alive. - return - - returncode = _compute_returncode(status) - debug_log = True - try: - loop, callback, args = self._callbacks.pop(pid) - except KeyError: # pragma: no cover - # May happen if .remove_child_handler() is called - # after os.waitpid() returns. - logger.warning("Child watcher got an unexpected pid: %r", - pid, exc_info=True) - else: + def _sig_chld(self, signum, frame): + for loop, watcher in self._loops.items(): + # TODO - is this good enough? can we do better? if loop.is_closed(): - logger.warning("Loop %r that handles pid %r is closed", loop, pid) + logger.warning("Loop %r is closed, but it still had running subprocesses", loop) else: - if debug_log and loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) - loop.call_soon_threadsafe(callback, pid, returncode, *args) - - def _sig_chld(self, signum, frame): - try: - self._do_waitpid_all() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: - logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) + loop.call_soon_threadsafe(watcher.do_waitpid_all) class ThreadedChildWatcher(AbstractChildWatcher): diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 177a02cdcc1744..4732ef06719d2f 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -1,6 +1,7 @@ import os import signal import sys +import time import unittest import warnings from unittest import mock @@ -655,6 +656,24 @@ async def go(): await proc.wait() self.loop.run_until_complete(go()) + def test_kill_before_wait(self): + # Patch os.kill to call sleep(0.1) first, + # opening up the window for a race condition. + os_kill = os.kill + def kill(pid, signum): + time.sleep(0.1) + os_kill(pid, signum) + + with mock.patch('subprocess.os.kill', kill): + async def go(): + p = await asyncio.create_subprocess_shell("exit 0") + try: + p.send_signal(signal.SIGTERM) + finally: + # cleanup + await p.wait() + self.loop.run_until_complete(go()) + if sys.platform != 'win32': # Unix