Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-31173: Rewrite WSTOPSIG test of test_subprocess #3055

Merged
merged 1 commit into from
Aug 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 22 additions & 35 deletions Lib/test/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
except ImportError:
threading = None

try:
import _testcapi
except ImportError:
_testcapi = None

if support.PGO:
raise unittest.SkipTest("test is not helpful for PGO")

Expand Down Expand Up @@ -2565,42 +2570,24 @@ def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
proc.communicate(timeout=999)
mock_proc_stdin.close.assert_called_once_with()

@unittest.skipIf(not ctypes, 'ctypes module required')
@unittest.skipIf(not sys.executable, 'Test requires sys.executable')
def test_child_terminated_in_stopped_state(self):
@unittest.skipUnless(_testcapi is not None
and hasattr(_testcapi, 'W_STOPCODE'),
'need _testcapi.W_STOPCODE')
def test_stopped(self):
"""Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
PTRACE_TRACEME = 0 # From glibc and MacOS (PT_TRACE_ME).
libc_name = ctypes.util.find_library('c')
libc = ctypes.CDLL(libc_name)
if not hasattr(libc, 'ptrace'):
raise unittest.SkipTest('ptrace() required')

code = textwrap.dedent(f"""
import ctypes
import faulthandler
from test.support import SuppressCrashReport

libc = ctypes.CDLL({libc_name!r})
libc.ptrace({PTRACE_TRACEME}, 0, 0)
""")

child = subprocess.Popen([sys.executable, '-c', code])
if child.wait() != 0:
raise unittest.SkipTest('ptrace() failed - unable to test')

code += textwrap.dedent(f"""
with SuppressCrashReport():
# Crash the process
faulthandler._sigsegv()
""")
child = subprocess.Popen([sys.executable, '-c', code])
try:
returncode = child.wait()
except:
child.kill() # Clean up the hung stopped process.
raise
self.assertNotEqual(0, returncode)
self.assertLess(returncode, 0) # signal death, likely SIGSEGV.
args = [sys.executable, '-c', 'pass']
proc = subprocess.Popen(args)

# Wait until the real process completes to avoid zombie process
pid = proc.pid
pid, status = os.waitpid(pid, 0)
self.assertEqual(status, 0)

status = _testcapi.W_STOPCODE(3)
with mock.patch('subprocess.os.waitpid', return_value=(pid, status)):
returncode = proc.wait()

self.assertEqual(returncode, -3)


@unittest.skipUnless(mswindows, "Windows specific tests")
Expand Down
22 changes: 22 additions & 0 deletions Modules/_testcapimodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
# include <winsock2.h> /* struct timeval */
#endif

#ifdef HAVE_SYS_WAIT_H
#include <sys/wait.h> /* For W_STOPCODE */
#endif

#ifdef WITH_THREAD
#include "pythread.h"
#endif /* WITH_THREAD */
Expand Down Expand Up @@ -4272,6 +4276,7 @@ test_pyobject_fastcallkeywords(PyObject *self, PyObject *args)
return _PyObject_FastCallKeywords(func, stack, nargs, kwnames);
}


static PyObject*
stack_pointer(PyObject *self, PyObject *args)
{
Expand All @@ -4280,6 +4285,20 @@ stack_pointer(PyObject *self, PyObject *args)
}


#ifdef W_STOPCODE
static PyObject*
py_w_stopcode(PyObject *self, PyObject *args)
{
int sig, status;
if (!PyArg_ParseTuple(args, "i", &sig)) {
return NULL;
}
status = W_STOPCODE(sig);
return PyLong_FromLong(status);
}
#endif


static PyMethodDef TestMethods[] = {
{"raise_exception", raise_exception, METH_VARARGS},
{"raise_memoryerror", (PyCFunction)raise_memoryerror, METH_NOARGS},
Expand Down Expand Up @@ -4493,6 +4512,9 @@ static PyMethodDef TestMethods[] = {
{"pyobject_fastcalldict", test_pyobject_fastcalldict, METH_VARARGS},
{"pyobject_fastcallkeywords", test_pyobject_fastcallkeywords, METH_VARARGS},
{"stack_pointer", stack_pointer, METH_NOARGS},
#ifdef W_STOPCODE
{"W_STOPCODE", py_w_stopcode, METH_VARARGS},
#endif
{NULL, NULL} /* sentinel */
};

Expand Down