Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion em++.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
try:
sys.exit(emcc.main(sys.argv))
except KeyboardInterrupt:
emcc.logger.warning('KeyboardInterrupt')
emcc.logger.debug('KeyboardInterrupt')
sys.exit(1)
2 changes: 1 addition & 1 deletion emcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4240,5 +4240,5 @@ def main(args):
try:
sys.exit(main(sys.argv))
except KeyboardInterrupt:
logger.warning('KeyboardInterrupt')
logger.debug('KeyboardInterrupt')
sys.exit(1)
93 changes: 28 additions & 65 deletions test/parallel_testsuite.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,34 @@
# found in the LICENSE file.

import multiprocessing
import os
import sys
import unittest
import tempfile
import time
import queue
import unittest

import common


NUM_CORES = None


def g_testing_thread(work_queue, result_queue, temp_dir):
for test in iter(lambda: get_from_queue(work_queue), None):
result = BufferedParallelTestResult()
test.set_temp_dir(temp_dir)
try:
test(result)
except unittest.SkipTest as e:
result.addSkip(test, e)
except Exception as e:
result.addError(test, e)
result_queue.put(result)
def run_test(test):
olddir = os.getcwd()
result = BufferedParallelTestResult()
temp_dir = tempfile.mkdtemp(prefix='emtest_')
test.set_temp_dir(temp_dir)
try:
test(result)
except unittest.SkipTest as e:
result.addSkip(test, e)
except Exception as e:
result.addError(test, e)
# Before attempting to delete the tmp dir make sure the current
# working directory is not within it.
os.chdir(olddir)
common.force_delete_dir(temp_dir)
return result


class ParallelTestSuite(unittest.BaseTestSuite):
Expand All @@ -37,8 +42,6 @@ class ParallelTestSuite(unittest.BaseTestSuite):

def __init__(self, max_cores):
super().__init__()
self.processes = None
self.result_queue = None
self.max_cores = max_cores

def run(self, result):
Expand All @@ -48,19 +51,18 @@ def run(self, result):
# inherited by the child process, but can lead to hard-to-debug windows-only
# issues.
# multiprocessing.set_start_method('spawn')
test_queue = self.create_test_queue()
self.init_processes(test_queue)
results = self.collect_results()
tests = list(self.reversed_tests())
use_cores = min(self.max_cores, len(tests), num_cores())
print('Using %s parallel test processes' % use_cores)
pool = multiprocessing.Pool(use_cores)
results = [pool.apply_async(run_test, (t,)) for t in tests]
results = [r.get() for r in results]
pool.close()
pool.join()
return self.combine_results(result, results)

def create_test_queue(self):
test_queue = multiprocessing.Queue()
for test in self.reversed_tests():
test_queue.put(test)
return test_queue

def reversed_tests(self):
"""A list of this suite's tests in reverse order.
"""A list of this suite's tests, sorted reverse alphabetical order.

Many of the tests in test_core are intentionally named so that long tests
fall toward the end of the alphabet (e.g. test_the_bullet). Tests are
Expand All @@ -69,38 +71,7 @@ def reversed_tests(self):

Future work: measure slowness of tests and sort accordingly.
"""
tests = []
for test in self:
tests.append(test)
tests.sort(key=str)
return tests[::-1]

def init_processes(self, test_queue):
use_cores = min(self.max_cores, num_cores())
print('Using %s parallel test processes' % use_cores)
self.processes = []
self.result_queue = multiprocessing.Queue()
self.dedicated_temp_dirs = [tempfile.mkdtemp() for x in range(use_cores)]
for temp_dir in self.dedicated_temp_dirs:
p = multiprocessing.Process(target=g_testing_thread,
args=(test_queue, self.result_queue, temp_dir))
p.start()
self.processes.append(p)

def collect_results(self):
buffered_results = []
while len(self.processes):
res = get_from_queue(self.result_queue)
if res is not None:
buffered_results.append(res)
else:
self.clear_finished_processes()
for temp_dir in self.dedicated_temp_dirs:
common.force_delete_dir(temp_dir)
return buffered_results

def clear_finished_processes(self):
self.processes = [p for p in self.processes if p.is_alive()]
return reversed(sorted(self, key=str))

def combine_results(self, result, buffered_results):
print()
Expand Down Expand Up @@ -249,11 +220,3 @@ def num_cores():
if NUM_CORES:
return int(NUM_CORES)
return multiprocessing.cpu_count()


def get_from_queue(q):
try:
return q.get(True, 0.1)
except queue.Empty:
pass
return None