Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-127933: Add option to run regression tests in parallel #128003

Merged
merged 11 commits into from
Feb 4, 2025
5 changes: 5 additions & 0 deletions Doc/library/test.rst
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,11 @@ The :mod:`test.support` module defines the following functions:
Decorator for invoking :func:`check_impl_detail` on *guards*. If that
returns ``False``, then uses *msg* as the reason for skipping the test.

.. decorator:: thread_unsafe(reason=None)

Decorator for marking tests as thread-unsafe. This test always runs in one
thread even when invoked with ``--parallel-threads``.


.. decorator:: no_tracing

Expand Down
5 changes: 5 additions & 0 deletions Lib/test/libregrtest/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def __init__(self, **kwargs) -> None:
self.print_slow = False
self.random_seed = None
self.use_mp = None
self.parallel_threads = None
self.forever = False
self.header = False
self.failfast = False
Expand Down Expand Up @@ -316,6 +317,10 @@ def _create_parser():
'a single process, ignore -jN option, '
'and failed tests are also rerun sequentially '
'in the same process')
group.add_argument('--parallel-threads', metavar='PARALLEL_THREADS',
type=int,
help='run copies of each test in PARALLEL_THREADS at '
'once')
group.add_argument('-T', '--coverage', action='store_true',
dest='trace',
help='turn on code coverage tracing using the trace '
Expand Down
3 changes: 3 additions & 0 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def __init__(self, ns: Namespace, _add_python_opts: bool = False):
else:
self.random_seed = ns.random_seed

self.parallel_threads = ns.parallel_threads

# tests
self.first_runtests: RunTests | None = None

Expand Down Expand Up @@ -506,6 +508,7 @@ def create_run_tests(self, tests: TestTuple) -> RunTests:
python_cmd=self.python_cmd,
randomize=self.randomize,
random_seed=self.random_seed,
parallel_threads=self.parallel_threads,
)

def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
Expand Down
79 changes: 79 additions & 0 deletions Lib/test/libregrtest/parallel_case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Run a test case multiple times in parallel threads."""

import copy
import functools
import threading
import unittest

from unittest import TestCase


class ParallelTestCase(TestCase):
def __init__(self, test_case: TestCase, num_threads: int):
self.test_case = test_case
self.num_threads = num_threads
self._testMethodName = test_case._testMethodName
self._testMethodDoc = test_case._testMethodDoc

def __str__(self):
return f"{str(self.test_case)} [threads={self.num_threads}]"

def run_worker(self, test_case: TestCase, result: unittest.TestResult,
barrier: threading.Barrier):
barrier.wait()
test_case.run(result)

def run(self, result=None):
if result is None:
result = test_case.defaultTestResult()
startTestRun = getattr(result, 'startTestRun', None)
stopTestRun = getattr(result, 'stopTestRun', None)
if startTestRun is not None:
startTestRun()
else:
stopTestRun = None

# Called at the beginning of each test. See TestCase.run.
result.startTest(self)

cases = [copy.copy(self.test_case) for _ in range(self.num_threads)]
results = [unittest.TestResult() for _ in range(self.num_threads)]

barrier = threading.Barrier(self.num_threads)
threads = []
for i, (case, r) in enumerate(zip(cases, results)):
thread = threading.Thread(target=self.run_worker,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to give these threads an informative name, something containing the testcase and the Nth thread it is.

args=(case, r, barrier),
name=f"{str(self.test_case)}-{i}",
daemon=True)
threads.append(thread)

for thread in threads:
thread.start()

for threads in threads:
threads.join()

# Aggregate test results
if all(r.wasSuccessful() for r in results):
result.addSuccess(self)

# Note: We can't call result.addError, result.addFailure, etc. because
# we no longer have the original exception, just the string format.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, another reason not to call those is that they raise right away if failfast is set.

for r in results:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like something that might be worth adding to unittest.result.TestResult, something to aggregate multiple results. (Not for this PR, but it might be a nice small feature for someone to work on.)

if len(r.errors) > 0 or len(r.failures) > 0:
result._mirrorOutput = True
result.errors.extend(r.errors)
result.failures.extend(r.failures)
result.skipped.extend(r.skipped)
result.expectedFailures.extend(r.expectedFailures)
result.unexpectedSuccesses.extend(r.unexpectedSuccesses)
result.collectedDurations.extend(r.collectedDurations)

if any(r.shouldStop for r in results):
result.stop()

# Test has finished running
result.stopTest(self)
if stopTestRun is not None:
stopTestRun()
3 changes: 3 additions & 0 deletions Lib/test/libregrtest/runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class RunTests:
python_cmd: tuple[str, ...] | None
randomize: bool
random_seed: int | str
parallel_threads: int | None

def copy(self, **override) -> 'RunTests':
state = dataclasses.asdict(self)
Expand Down Expand Up @@ -184,6 +185,8 @@ def bisect_cmd_args(self) -> list[str]:
args.extend(("--python", cmd))
if self.randomize:
args.append(f"--randomize")
if self.parallel_threads:
args.append(f"--parallel-threads={self.parallel_threads}")
args.append(f"--randseed={self.random_seed}")
return args

Expand Down
30 changes: 28 additions & 2 deletions Lib/test/libregrtest/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .save_env import saved_test_environment
from .setup import setup_tests
from .testresult import get_test_runner
from .parallel_case import ParallelTestCase
from .utils import (
TestName,
clear_caches, remove_testfn, abs_module_name, print_warning)
Expand All @@ -27,14 +28,17 @@
PROGRESS_MIN_TIME = 30.0 # seconds


def run_unittest(test_mod):
def run_unittest(test_mod, runtests: RunTests):
loader = unittest.TestLoader()
tests = loader.loadTestsFromModule(test_mod)

for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
_filter_suite(tests, match_test)
if runtests.parallel_threads:
_parallelize_tests(tests, runtests.parallel_threads)
return _run_suite(tests)

def _filter_suite(suite, pred):
Expand All @@ -49,6 +53,28 @@ def _filter_suite(suite, pred):
newtests.append(test)
suite._tests = newtests

def _parallelize_tests(suite, parallel_threads: int):
def is_thread_unsafe(test):
test_method = getattr(test, test._testMethodName)
instance = test_method.__self__
return (getattr(test_method, "__unittest_thread_unsafe__", False) or
getattr(instance, "__unittest_thread_unsafe__", False))

newtests: list[object] = []
for test in suite._tests:
if isinstance(test, unittest.TestSuite):
_parallelize_tests(test, parallel_threads)
newtests.append(test)
continue

if is_thread_unsafe(test):
# Don't parallelize thread-unsafe tests
newtests.append(test)
continue

newtests.append(ParallelTestCase(test, parallel_threads))
suite._tests = newtests

def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
runner = get_test_runner(sys.stdout,
Expand Down Expand Up @@ -133,7 +159,7 @@ def _load_run_test(result: TestResult, runtests: RunTests) -> None:
raise Exception(f"Module {test_name} defines test_main() which "
f"is no longer supported by regrtest")
def test_func():
return run_unittest(test_mod)
return run_unittest(test_mod, runtests)

try:
regrtest_runner(result, test_func, runtests)
Expand Down
17 changes: 16 additions & 1 deletion Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"anticipate_failure", "load_package_tests", "detect_api_mismatch",
"check__all__", "skip_if_buggy_ucrt_strfptime",
"check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
"requires_limited_api", "requires_specialization",
"requires_limited_api", "requires_specialization", "thread_unsafe",
# sys
"MS_WINDOWS", "is_jython", "is_android", "is_emscripten", "is_wasi",
"is_apple_mobile", "check_impl_detail", "unix_shell", "setswitchinterval",
Expand Down Expand Up @@ -382,6 +382,21 @@ def wrapper(*args, **kw):
return decorator


def thread_unsafe(reason):
"""Mark a test as not thread safe. When the test runner is run with
--parallel-threads=N, the test will be run in a single thread."""
def decorator(test_item):
test_item.__unittest_thread_unsafe__ = True
# the reason is not currently used
test_item.__unittest_thread_unsafe__why__ = reason
return test_item
if isinstance(reason, types.FunctionType):
test_item = reason
reason = ''
return decorator(test_item)
return decorator


def skip_if_buildbot(reason=None):
"""Decorator raising SkipTest if running on a buildbot."""
import getpass
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_class.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"Test the functionality of Python classes implementing operators."

import unittest
from test import support
from test.support import cpython_only, import_helper, script_helper, skip_emscripten_stack_overflow

testmeths = [
Expand Down Expand Up @@ -134,6 +135,7 @@ def __%s__(self, *args):
AllTests = type("AllTests", (object,), d)
del d, statictests, method, method_template

@support.thread_unsafe("callLst is shared between threads")
class ClassTests(unittest.TestCase):
def setUp(self):
callLst[:] = []
Expand Down
3 changes: 3 additions & 0 deletions Lib/test/test_descr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,7 @@ class MyFrozenSet(frozenset):
with self.assertRaises(TypeError):
frozenset().__class__ = MyFrozenSet

@support.thread_unsafe
def test_slots(self):
# Testing __slots__...
class C0(object):
Expand Down Expand Up @@ -5485,6 +5486,7 @@ def __repr__(self):
{pickle.dumps, pickle._dumps},
{pickle.loads, pickle._loads}))

@support.thread_unsafe
def test_pickle_slots(self):
# Tests pickling of classes with __slots__.

Expand Down Expand Up @@ -5552,6 +5554,7 @@ class E(C):
y = pickle_copier.copy(x)
self._assert_is_copy(x, y)

@support.thread_unsafe
def test_reduce_copying(self):
# Tests pickling and copying new-style classes and objects.
global C1
Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ class COperatorTestCase(OperatorTestCase, unittest.TestCase):
module = c_operator


@support.thread_unsafe("swaps global operator module")
class OperatorPickleTestCase:
def copy(self, obj, proto):
with support.swap_item(sys.modules, 'operator', self.module):
Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_tokenize.py
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,7 @@ def test_false_encoding(self):
self.assertEqual(encoding, 'utf-8')
self.assertEqual(consumed_lines, [b'print("#coding=fake")'])

@support.thread_unsafe
def test_open(self):
filename = os_helper.TESTFN + '.py'
self.addCleanup(os_helper.unlink, filename)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add an option ``--parallel-threads=N`` to the regression test runner that
runs individual tests in multiple threads in parallel in order to find
concurrency bugs. Note that most of the test suite is not yet reviewed for
thread-safety or annotated with ``@thread_unsafe`` when necessary.
Loading