|
13 | 13 | import logging
|
14 | 14 | import os
|
15 | 15 | import re
|
| 16 | +import multiprocessing |
16 | 17 | import shutil
|
17 | 18 | import subprocess
|
18 | 19 | import signal
|
@@ -162,85 +163,23 @@ def run_multiple_processes(commands,
|
162 | 163 | """
|
163 | 164 | assert not (route_stdout_to_temp_files_suffix and pipe_stdout), 'Cannot simultaneously pipe stdout to file and a string! Choose one or the other.'
|
164 | 165 |
|
| 166 | + global multiprocessing_pool |
| 167 | + |
| 168 | + if not multiprocessing_pool: |
| 169 | + max_workers = get_num_cores() |
| 170 | + if WINDOWS: |
| 171 | + # On windows, there is limit on the total number of tasks which is |
| 172 | + # enforced bu multiprocessing, but only on newer version of python: |
| 173 | + # https://bugs.python.org/issue26903 |
| 174 | + # TODO(sbc): This could be removed once we update the min python version to 3.8.0 |
| 175 | + max_workers = min(max_workers, 61) |
| 176 | + multiprocessing_pool = multiprocessing.Pool(processes=max_workers) |
| 177 | + |
165 | 178 | if env is None:
|
166 | 179 | env = os.environ.copy()
|
167 | 180 |
|
168 |
| - # By default, avoid using Python multiprocessing library due to a large amount |
169 |
| - # of bugs it has on Windows (#8013, #718, etc.) |
170 |
| - # Use EM_PYTHON_MULTIPROCESSING=1 environment variable to enable it. It can be |
171 |
| - # faster, but may not work on Windows. |
172 |
| - if int(os.getenv('EM_PYTHON_MULTIPROCESSING', '0')): |
173 |
| - import multiprocessing |
174 |
| - max_workers = get_num_cores() |
175 |
| - global multiprocessing_pool |
176 |
| - if not multiprocessing_pool: |
177 |
| - if WINDOWS: |
178 |
| - # Fix for python < 3.8 on windows. See: https://github.com/python/cpython/pull/13132 |
179 |
| - max_workers = min(max_workers, 61) |
180 |
| - multiprocessing_pool = multiprocessing.Pool(processes=max_workers) |
181 |
| - return multiprocessing_pool.map(mp_run_process, [(cmd, env, route_stdout_to_temp_files_suffix, pipe_stdout, check, cwd) for cmd in commands], chunksize=1) |
182 |
| - |
183 |
| - std_outs = [] |
184 |
| - |
185 |
| - # TODO: Experiment with registering a signal handler here to see if that helps with Ctrl-C locking up the command prompt |
186 |
| - # when multiple child processes have been spawned. |
187 |
| - # import signal |
188 |
| - # def signal_handler(sig, frame): |
189 |
| - # sys.exit(1) |
190 |
| - # signal.signal(signal.SIGINT, signal_handler) |
191 |
| - |
192 |
| - processes = [] |
193 |
| - num_parallel_processes = get_num_cores() |
194 |
| - temp_files = get_temp_files() |
195 |
| - i = 0 |
196 |
| - num_completed = 0 |
197 |
| - |
198 |
| - while num_completed < len(commands): |
199 |
| - if i < len(commands) and len(processes) < num_parallel_processes: |
200 |
| - # Not enough parallel processes running, spawn a new one. |
201 |
| - std_out = temp_files.get(route_stdout_to_temp_files_suffix) if route_stdout_to_temp_files_suffix else (subprocess.PIPE if pipe_stdout else None) |
202 |
| - if DEBUG: |
203 |
| - logger.debug('Running subprocess %d/%d: %s' % (i + 1, len(commands), ' '.join(commands[i]))) |
204 |
| - print_compiler_stage(commands[i]) |
205 |
| - processes += [(i, subprocess.Popen(commands[i], stdout=std_out, stderr=subprocess.PIPE if pipe_stdout else None, env=env, cwd=cwd))] |
206 |
| - if route_stdout_to_temp_files_suffix: |
207 |
| - std_outs += [(i, std_out.name)] |
208 |
| - i += 1 |
209 |
| - else: |
210 |
| - # Not spawning a new process (Too many commands running in parallel, or no commands left): find if a process has finished. |
211 |
| - def get_finished_process(): |
212 |
| - while True: |
213 |
| - j = 0 |
214 |
| - while j < len(processes): |
215 |
| - if processes[j][1].poll() is not None: |
216 |
| - out, err = processes[j][1].communicate() |
217 |
| - return (j, out.decode('UTF-8') if out else '', err.decode('UTF-8') if err else '') |
218 |
| - j += 1 |
219 |
| - # All processes still running; wait a short while for the first (oldest) process to finish, |
220 |
| - # then look again if any process has completed. |
221 |
| - try: |
222 |
| - out, err = processes[0][1].communicate(0.2) |
223 |
| - return (0, out.decode('UTF-8') if out else '', err.decode('UTF-8') if err else '') |
224 |
| - except subprocess.TimeoutExpired: |
225 |
| - pass |
226 |
| - |
227 |
| - j, out, err = get_finished_process() |
228 |
| - idx, finished_process = processes[j] |
229 |
| - del processes[j] |
230 |
| - if pipe_stdout: |
231 |
| - std_outs += [(idx, out)] |
232 |
| - if check and finished_process.returncode != 0: |
233 |
| - if out: |
234 |
| - logger.info(out) |
235 |
| - if err: |
236 |
| - logger.error(err) |
237 |
| - |
238 |
| - raise Exception('Subprocess %d/%d failed (%s)! (cmdline: %s)' % (idx + 1, len(commands), returncode_to_str(finished_process.returncode), shlex_join(commands[idx]))) |
239 |
| - num_completed += 1 |
240 |
| - |
241 |
| - # If processes finished out of order, sort the results to the order of the input. |
242 |
| - std_outs.sort(key=lambda x: x[0]) |
243 |
| - return [x[1] for x in std_outs] |
| 181 | + cmd_list = [(cmd, env, route_stdout_to_temp_files_suffix, pipe_stdout, check, cwd) for cmd in commands] |
| 182 | + return multiprocessing_pool.map(mp_run_process, cmd_list, chunksize=1) |
244 | 183 |
|
245 | 184 |
|
246 | 185 | def check_call(cmd, *args, **kw):
|
|
0 commit comments