Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 86 additions & 35 deletions Lib/multiprocessing/forkserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import struct
import sys
import threading
import warnings

from . import connection
from . import process
Expand All @@ -22,7 +23,7 @@
#

MAXFDS_TO_SEND = 256
UNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t
SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t

#
# Forkserver class
Expand Down Expand Up @@ -148,21 +149,33 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):

util._close_stdin()

# ignoring SIGCHLD means no need to reap zombie processes;
sig_r, sig_w = os.pipe()
os.set_blocking(sig_w, False)

def sigchld_handler(*_unused):
try:
os.write(sig_w, b'.')
except BlockingIOError:
pass

# letting SIGINT through avoids KeyboardInterrupt tracebacks
handlers = {
signal.SIGCHLD: signal.SIG_IGN,
signal.SIGCHLD: sigchld_handler,
signal.SIGINT: signal.SIG_DFL,
}
old_handlers = {sig: signal.signal(sig, val)
for (sig, val) in handlers.items()}

# map child pids to client fds
pid_to_fd = {}

with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
selectors.DefaultSelector() as selector:
_forkserver._forkserver_address = listener.getsockname()

selector.register(listener, selectors.EVENT_READ)
selector.register(alive_r, selectors.EVENT_READ)
selector.register(sig_r, selectors.EVENT_READ)

while True:
try:
Expand All @@ -176,62 +189,100 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
assert os.read(alive_r, 1) == b''
raise SystemExit

assert listener in rfds
with listener.accept()[0] as s:
code = 1
if os.fork() == 0:
if sig_r in rfds:
# Got SIGCHLD
os.read(sig_r, 65536) # exhaust
while True:
# Scan for child processes
try:
_serve_one(s, listener, alive_r, old_handlers)
except Exception:
sys.excepthook(*sys.exc_info())
sys.stderr.flush()
finally:
os._exit(code)
pid, sts = os.waitpid(-1, os.WNOHANG)
except ChildProcessError:
break
if pid == 0:
break
child_w = pid_to_fd.pop(pid, None)
if child_w is not None:
if os.WIFSIGNALED(sts):
returncode = -os.WTERMSIG(sts)
else:
assert os.WIFEXITED(sts)
returncode = os.WEXITSTATUS(sts)
# Write the exit code to the pipe
write_signed(child_w, returncode)
os.close(child_w)
else:
# This shouldn't happen really
warnings.warn('forkserver: waitpid returned '
'unexpected pid %d' % pid)

if listener in rfds:
# Incoming fork request
with listener.accept()[0] as s:
# Receive fds from client
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
assert len(fds) <= MAXFDS_TO_SEND
child_r, child_w, *fds = fds
s.close()
pid = os.fork()
if pid == 0:
# Child
code = 1
try:
listener.close()
code = _serve_one(child_r, fds,
(alive_r, child_w, sig_r, sig_w),
old_handlers)
except Exception:
sys.excepthook(*sys.exc_info())
sys.stderr.flush()
finally:
os._exit(code)
else:
# Send pid to client processes
write_signed(child_w, pid)
pid_to_fd[pid] = child_w
os.close(child_r)
for fd in fds:
os.close(fd)

except OSError as e:
if e.errno != errno.ECONNABORTED:
raise

def _serve_one(s, listener, alive_r, handlers):

def _serve_one(child_r, fds, unused_fds, handlers):
# close unnecessary stuff and reset signal handlers
listener.close()
os.close(alive_r)
for sig, val in handlers.items():
signal.signal(sig, val)
for fd in unused_fds:
os.close(fd)

# receive fds from parent process
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
s.close()
assert len(fds) <= MAXFDS_TO_SEND
(child_r, child_w, _forkserver._forkserver_alive_fd,
stfd, *_forkserver._inherited_fds) = fds
semaphore_tracker._semaphore_tracker._fd = stfd

# send pid to client processes
write_unsigned(child_w, os.getpid())
(_forkserver._forkserver_alive_fd,
semaphore_tracker._semaphore_tracker._fd,
*_forkserver._inherited_fds) = fds

# run process object received over pipe
# Run process object received over pipe
code = spawn._main(child_r)

# write the exit code to the pipe
write_unsigned(child_w, code)
return code


#
# Read and write unsigned numbers
# Read and write signed numbers
#

def read_unsigned(fd):
def read_signed(fd):
data = b''
length = UNSIGNED_STRUCT.size
length = SIGNED_STRUCT.size
while len(data) < length:
s = os.read(fd, length - len(data))
if not s:
raise EOFError('unexpected EOF')
data += s
return UNSIGNED_STRUCT.unpack(data)[0]
return SIGNED_STRUCT.unpack(data)[0]

def write_unsigned(fd, n):
msg = UNSIGNED_STRUCT.pack(n)
def write_signed(fd, n):
msg = SIGNED_STRUCT.pack(n)
while msg:
nbytes = os.write(fd, msg)
if nbytes == 0:
Expand Down
15 changes: 6 additions & 9 deletions Lib/multiprocessing/popen_fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ def duplicate_for_child(self, fd):

def poll(self, flag=os.WNOHANG):
if self.returncode is None:
while True:
try:
pid, sts = os.waitpid(self.pid, flag)
except OSError as e:
# Child process not yet created. See #1731717
# e.errno == errno.ECHILD == 10
return None
else:
break
try:
pid, sts = os.waitpid(self.pid, flag)
except OSError as e:
# Child process not yet created. See #1731717
# e.errno == errno.ECHILD == 10
return None
if pid == self.pid:
if os.WIFSIGNALED(sts):
self.returncode = -os.WTERMSIG(sts)
Expand Down
8 changes: 5 additions & 3 deletions Lib/multiprocessing/popen_forkserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _launch(self, process_obj):
util.Finalize(self, os.close, (self.sentinel,))
with open(w, 'wb', closefd=True) as f:
f.write(buf.getbuffer())
self.pid = forkserver.read_unsigned(self.sentinel)
self.pid = forkserver.read_signed(self.sentinel)

def poll(self, flag=os.WNOHANG):
if self.returncode is None:
Expand All @@ -61,8 +61,10 @@ def poll(self, flag=os.WNOHANG):
if not wait([self.sentinel], timeout):
return None
try:
self.returncode = forkserver.read_unsigned(self.sentinel)
self.returncode = forkserver.read_signed(self.sentinel)
except (OSError, EOFError):
# The process ended abnormally perhaps because of a signal
# This should not happen usually, but perhaps the forkserver
# process itself got killed
self.returncode = 255

return self.returncode
39 changes: 37 additions & 2 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ def test_process(self):
def _test_terminate(cls):
time.sleep(100)

@classmethod
def _test_sleep(cls, delay):
time.sleep(delay)

def test_terminate(self):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
Expand Down Expand Up @@ -323,8 +327,9 @@ def handler(*args):

p.join()

# XXX sometimes get p.exitcode == 0 on Windows ...
#self.assertEqual(p.exitcode, -signal.SIGTERM)
# sometimes get p.exitcode == 0 on Windows ...
if os.name != 'nt':
self.assertEqual(p.exitcode, -signal.SIGTERM)

def test_cpu_count(self):
try:
Expand Down Expand Up @@ -398,6 +403,36 @@ def test_sentinel(self):
p.join()
self.assertTrue(wait_for_handle(sentinel, timeout=1))

def test_many_processes(self):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))

sm = multiprocessing.get_start_method()
N = 5 if sm == 'spawn' else 100

# Try to overwhelm the forkserver loop with events
procs = [self.Process(target=self._test_sleep, args=(0.01,))
for i in range(N)]
for p in procs:
p.start()
for p in procs:
p.join(timeout=10)
for p in procs:
self.assertEqual(p.exitcode, 0)

procs = [self.Process(target=self._test_terminate)
for i in range(N)]
for p in procs:
p.start()
time.sleep(0.001) # let the children start...
for p in procs:
p.terminate()
for p in procs:
p.join(timeout=10)
if os.name != 'nt':
for p in procs:
self.assertEqual(p.exitcode, -signal.SIGTERM)

#
#
#
Expand Down
4 changes: 4 additions & 0 deletions Misc/NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ Extension Modules
Library
-------

- bpo-30589: Fix multiprocessing.Process.exitcode to return the opposite
of the signal number when the process is killed by a signal (instead
of 255) when using the "forkserver" method.

- bpo-28994: The traceback no longer displayed for SystemExit raised in
a callback registered by atexit.

Expand Down