-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Always use EM_PYTHON_MULTIPROCESSING after win32 bugfix #17881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
import logging | ||
import os | ||
import re | ||
import multiprocessing | ||
import shutil | ||
import subprocess | ||
import signal | ||
|
@@ -162,85 +163,23 @@ def run_multiple_processes(commands, | |
""" | ||
assert not (route_stdout_to_temp_files_suffix and pipe_stdout), 'Cannot simultaneously pipe stdout to file and a string! Choose one or the other.' | ||
|
||
global multiprocessing_pool | ||
|
||
if not multiprocessing_pool: | ||
max_workers = get_num_cores() | ||
if WINDOWS: | ||
# On windows, there is limit on the total number of tasks which is | ||
# enforced bu multiprocessing, but only on newer version of python: | ||
# https://bugs.python.org/issue26903 | ||
# TODO(sbc): This could be removed once we update the min python version to 3.8.0 | ||
max_workers = min(max_workers, 61) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the fix just this, to limit to 61? So this was only ever a problem for machines with >61 cores..? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that is my understanding. see the python bug for details. I guess for emcc users it would also effect users who set EMCC_CORES > 61.. since you can create more threads that cores if you want to using that variable.b There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I'm a little surprised then that we got but reports about this. How many people have that many cores..? If we can confirm this works, or we can ask someone that has encountered the bug, that would make me more confident here. But, maybe I'm being overly cautious, if this is in upstream Python it's probably fine..? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but it makes sense it you look at the original bug report : "On the other Windows system, this issue does not seem to ever occur. The affected system is a 64-thread Threadripper 2990WX, and the numbers are quite close to 64, so I wonder if that has a meaning here. The unaffected system is a 16-thread system." |
||
multiprocessing_pool = multiprocessing.Pool(processes=max_workers) | ||
|
||
if env is None: | ||
env = os.environ.copy() | ||
|
||
# By default, avoid using Python multiprocessing library due to a large amount | ||
# of bugs it has on Windows (#8013, #718, etc.) | ||
# Use EM_PYTHON_MULTIPROCESSING=1 environment variable to enable it. It can be | ||
# faster, but may not work on Windows. | ||
if int(os.getenv('EM_PYTHON_MULTIPROCESSING', '0')): | ||
import multiprocessing | ||
max_workers = get_num_cores() | ||
global multiprocessing_pool | ||
if not multiprocessing_pool: | ||
if WINDOWS: | ||
# Fix for python < 3.8 on windows. See: https://github.com/python/cpython/pull/13132 | ||
max_workers = min(max_workers, 61) | ||
multiprocessing_pool = multiprocessing.Pool(processes=max_workers) | ||
return multiprocessing_pool.map(mp_run_process, [(cmd, env, route_stdout_to_temp_files_suffix, pipe_stdout, check, cwd) for cmd in commands], chunksize=1) | ||
|
||
std_outs = [] | ||
|
||
# TODO: Experiment with registering a signal handler here to see if that helps with Ctrl-C locking up the command prompt | ||
# when multiple child processes have been spawned. | ||
# import signal | ||
# def signal_handler(sig, frame): | ||
# sys.exit(1) | ||
# signal.signal(signal.SIGINT, signal_handler) | ||
|
||
processes = [] | ||
num_parallel_processes = get_num_cores() | ||
temp_files = get_temp_files() | ||
i = 0 | ||
num_completed = 0 | ||
|
||
while num_completed < len(commands): | ||
if i < len(commands) and len(processes) < num_parallel_processes: | ||
# Not enough parallel processes running, spawn a new one. | ||
std_out = temp_files.get(route_stdout_to_temp_files_suffix) if route_stdout_to_temp_files_suffix else (subprocess.PIPE if pipe_stdout else None) | ||
if DEBUG: | ||
logger.debug('Running subprocess %d/%d: %s' % (i + 1, len(commands), ' '.join(commands[i]))) | ||
print_compiler_stage(commands[i]) | ||
processes += [(i, subprocess.Popen(commands[i], stdout=std_out, stderr=subprocess.PIPE if pipe_stdout else None, env=env, cwd=cwd))] | ||
if route_stdout_to_temp_files_suffix: | ||
std_outs += [(i, std_out.name)] | ||
i += 1 | ||
else: | ||
# Not spawning a new process (Too many commands running in parallel, or no commands left): find if a process has finished. | ||
def get_finished_process(): | ||
while True: | ||
j = 0 | ||
while j < len(processes): | ||
if processes[j][1].poll() is not None: | ||
out, err = processes[j][1].communicate() | ||
return (j, out.decode('UTF-8') if out else '', err.decode('UTF-8') if err else '') | ||
j += 1 | ||
# All processes still running; wait a short while for the first (oldest) process to finish, | ||
# then look again if any process has completed. | ||
try: | ||
out, err = processes[0][1].communicate(0.2) | ||
return (0, out.decode('UTF-8') if out else '', err.decode('UTF-8') if err else '') | ||
except subprocess.TimeoutExpired: | ||
pass | ||
|
||
j, out, err = get_finished_process() | ||
idx, finished_process = processes[j] | ||
del processes[j] | ||
if pipe_stdout: | ||
std_outs += [(idx, out)] | ||
if check and finished_process.returncode != 0: | ||
if out: | ||
logger.info(out) | ||
if err: | ||
logger.error(err) | ||
|
||
raise Exception('Subprocess %d/%d failed (%s)! (cmdline: %s)' % (idx + 1, len(commands), returncode_to_str(finished_process.returncode), shlex_join(commands[idx]))) | ||
num_completed += 1 | ||
|
||
# If processes finished out of order, sort the results to the order of the input. | ||
std_outs.sort(key=lambda x: x[0]) | ||
return [x[1] for x in std_outs] | ||
cmd_list = [(cmd, env, route_stdout_to_temp_files_suffix, pipe_stdout, check, cwd) for cmd in commands] | ||
return multiprocessing_pool.map(mp_run_process, cmd_list, chunksize=1) | ||
|
||
|
||
def check_call(cmd, *args, **kw): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(this needs to move)