Skip to content

Commit

Permalink
bpo-42110: prototype fix for MultiLoopChildWatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
sourcejedi committed Oct 22, 2020
1 parent c60394c commit 92f979b
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 56 deletions.
145 changes: 89 additions & 56 deletions Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,79 @@ def _do_waitpid_all(self):
callback(pid, returncode, *args)


# Internal helper for MultiLoopChildWatcher.
# Manage the child handlers for a single event loop.
# So all accesses to this are from the same thread.
class _LoopChildWatcher:
def __init__(self, loop):
self._loop = loop
self._callbacks = {}

def add_child_handler(self, pid, callback, *args):
self._callbacks[pid] = (callback, args)

# Prevent a race condition in case the child is already terminated.
self._do_waitpid(pid)

def remove_child_handler(self, pid):
try:
del self._callbacks[pid]
return True
except KeyError:
return False

def empty(self):
return not self._callbacks

def _do_waitpid(self, expected_pid):
assert expected_pid > 0

try:
pid, status = os.waitpid(expected_pid, os.WNOHANG)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255",
pid)
debug_log = False
else:
if pid == 0:
# The child process is still alive.
return

returncode = _compute_returncode(status)
debug_log = True
try:
callback, args = self._callbacks.pop(pid)
except KeyError: # pragma: no cover
# May happen if .remove_child_handler() is called
# after os.waitpid() returns.
logger.warning("Child watcher got an unexpected pid: %r",
pid, exc_info=True)
else:
if debug_log and self._loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
self._loop.call_soon(callback, pid, returncode, *args)

def do_waitpid_all(self):
try:
for pid in list(self._callbacks):
self._do_waitpid(pid)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
# self._loop should always be available here
# as we are only called via loop.call_soon_threadsafe()
self._loop.call_exception_handler({
'message': 'Unknown exception in SIGCHLD handler',
'exception': exc,
})


class MultiLoopChildWatcher(AbstractChildWatcher):
"""A watcher that doesn't require running loop in the main thread.
Expand All @@ -1222,14 +1295,14 @@ class MultiLoopChildWatcher(AbstractChildWatcher):
# but retrieves the current loop by get_running_loop()

def __init__(self):
self._callbacks = {}
self._loops = {} # event loop -> _LoopChildWatcher
self._saved_sighandler = None

def is_active(self):
return self._saved_sighandler is not None

def close(self):
self._callbacks.clear()
self._loops.clear()
if self._saved_sighandler is not None:
handler = signal.getsignal(signal.SIGCHLD)
if handler != self._sig_chld:
Expand All @@ -1246,17 +1319,18 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def add_child_handler(self, pid, callback, *args):
loop = events.get_running_loop()
self._callbacks[pid] = (loop, callback, args)

# Prevent a race condition in case the child is already terminated.
self._do_waitpid(pid)
if not loop in self._loops:
self._loops[loop] = _LoopChildWatcher(loop)
watcher = self._loops[loop]
watcher.add_child_handler(pid, callback, *args)

def remove_child_handler(self, pid):
try:
del self._callbacks[pid]
return True
except KeyError:
if not loop in self._loops:
return False
watcher = self._loops[loop]
ret = watcher.remove_child_handler(pid)
if watcher.empty():
del self._loops[loop]

def attach_loop(self, loop):
# Don't save the loop but initialize itself if called first time
Expand All @@ -1273,54 +1347,13 @@ def attach_loop(self, loop):
# Set SA_RESTART to limit EINTR occurrences.
signal.siginterrupt(signal.SIGCHLD, False)

def _do_waitpid_all(self):
for pid in list(self._callbacks):
self._do_waitpid(pid)

def _do_waitpid(self, expected_pid):
assert expected_pid > 0

try:
pid, status = os.waitpid(expected_pid, os.WNOHANG)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255",
pid)
debug_log = False
else:
if pid == 0:
# The child process is still alive.
return

returncode = _compute_returncode(status)
debug_log = True
try:
loop, callback, args = self._callbacks.pop(pid)
except KeyError: # pragma: no cover
# May happen if .remove_child_handler() is called
# after os.waitpid() returns.
logger.warning("Child watcher got an unexpected pid: %r",
pid, exc_info=True)
else:
def _sig_chld(self, signum, frame):
for loop, watcher in self._loops.items():
# TODO - is this good enough? can we do better?
if loop.is_closed():
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
logger.warning("Loop %r is closed, but it still had running subprocesses", loop)
else:
if debug_log and loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
loop.call_soon_threadsafe(callback, pid, returncode, *args)

def _sig_chld(self, signum, frame):
try:
self._do_waitpid_all()
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
loop.call_soon_threadsafe(watcher.do_waitpid_all)


class ThreadedChildWatcher(AbstractChildWatcher):
Expand Down
19 changes: 19 additions & 0 deletions Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import signal
import sys
import time
import unittest
import warnings
from unittest import mock
Expand Down Expand Up @@ -655,6 +656,24 @@ async def go():
await proc.wait()
self.loop.run_until_complete(go())

def test_kill_before_wait(self):
# Patch os.kill to call sleep(0.1) first,
# opening up the window for a race condition.
os_kill = os.kill
def kill(pid, signum):
time.sleep(0.1)
os_kill(pid, signum)

with mock.patch('subprocess.os.kill', kill):
async def go():
p = await asyncio.create_subprocess_shell("exit 0")
try:
p.send_signal(signal.SIGTERM)
finally:
# cleanup
await p.wait()
self.loop.run_until_complete(go())


if sys.platform != 'win32':
# Unix
Expand Down

0 comments on commit 92f979b

Please sign in to comment.