Skip to content

Commit 356de02

Browse files
authored
[3.12] gh-109047: concurrent.futures catches RuntimeError (#109810) (#110126)
gh-109047: concurrent.futures catches PythonFinalizationError (#109810) concurrent.futures: The *executor manager thread* now catches exceptions when adding an item to the *call queue*. During Python finalization, creating a new thread can now raise RuntimeError. Catch the exception and call terminate_broken() in this case. Add test_python_finalization_error() to test_concurrent_futures. concurrent.futures._ExecutorManagerThread changes: * terminate_broken() no longer calls shutdown_workers() since the call queue is no longer working anymore (read and write ends of the queue pipe are closed). * terminate_broken() now terminates child processes, not only wait until they complete. * _ExecutorManagerThread.terminate_broken() now holds shutdown_lock to prevent race conditons with ProcessPoolExecutor.submit(). multiprocessing.Queue changes: * Add _terminate_broken() method. * _start_thread() sets _thread to None on exception to prevent leaking "dangling threads" even if the thread was not started yet. (cherry picked from commit 6351842)
1 parent 41eb0c7 commit 356de02

File tree

4 files changed

+90
-17
lines changed

4 files changed

+90
-17
lines changed

Lib/concurrent/futures/process.py

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,14 @@ def run(self):
341341
# Main loop for the executor manager thread.
342342

343343
while True:
344-
self.add_call_item_to_queue()
344+
# gh-109047: During Python finalization, self.call_queue.put()
345+
# creation of a thread can fail with RuntimeError.
346+
try:
347+
self.add_call_item_to_queue()
348+
except BaseException as exc:
349+
cause = format_exception(exc)
350+
self.terminate_broken(cause)
351+
return
345352

346353
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
347354

@@ -425,8 +432,8 @@ def wait_result_broken_or_wakeup(self):
425432
try:
426433
result_item = result_reader.recv()
427434
is_broken = False
428-
except BaseException as e:
429-
cause = format_exception(type(e), e, e.__traceback__)
435+
except BaseException as exc:
436+
cause = format_exception(exc)
430437

431438
elif wakeup_reader in ready:
432439
is_broken = False
@@ -473,7 +480,7 @@ def is_shutting_down(self):
473480
return (_global_shutdown or executor is None
474481
or executor._shutdown_thread)
475482

476-
def terminate_broken(self, cause):
483+
def _terminate_broken(self, cause):
477484
# Terminate the executor because it is in a broken state. The cause
478485
# argument can be used to display more information on the error that
479486
# lead the executor into becoming broken.
@@ -498,7 +505,14 @@ def terminate_broken(self, cause):
498505

499506
# Mark pending tasks as failed.
500507
for work_id, work_item in self.pending_work_items.items():
501-
work_item.future.set_exception(bpe)
508+
try:
509+
work_item.future.set_exception(bpe)
510+
except _base.InvalidStateError:
511+
# set_exception() fails if the future is cancelled: ignore it.
512+
# Trying to check if the future is cancelled before calling
513+
# set_exception() would leave a race condition if the future is
514+
# cancelled between the check and set_exception().
515+
pass
502516
# Delete references to object. See issue16284
503517
del work_item
504518
self.pending_work_items.clear()
@@ -508,16 +522,18 @@ def terminate_broken(self, cause):
508522
for p in self.processes.values():
509523
p.terminate()
510524

511-
# Prevent queue writing to a pipe which is no longer read.
512-
# https://github.com/python/cpython/issues/94777
513-
self.call_queue._reader.close()
525+
self.call_queue._terminate_broken()
514526

515527
# gh-107219: Close the connection writer which can unblock
516528
# Queue._feed() if it was stuck in send_bytes().
517529
self.call_queue._writer.close()
518530

519531
# clean up resources
520-
self.join_executor_internals()
532+
self._join_executor_internals(broken=True)
533+
534+
def terminate_broken(self, cause):
535+
with self.shutdown_lock:
536+
self._terminate_broken(cause)
521537

522538
def flag_executor_shutting_down(self):
523539
# Flag the executor as shutting down and cancel remaining tasks if
@@ -560,15 +576,24 @@ def shutdown_workers(self):
560576
break
561577

562578
def join_executor_internals(self):
563-
self.shutdown_workers()
579+
with self.shutdown_lock:
580+
self._join_executor_internals()
581+
582+
def _join_executor_internals(self, broken=False):
583+
# If broken, call_queue was closed and so can no longer be used.
584+
if not broken:
585+
self.shutdown_workers()
586+
564587
# Release the queue's resources as soon as possible.
565588
self.call_queue.close()
566589
self.call_queue.join_thread()
567-
with self.shutdown_lock:
568-
self.thread_wakeup.close()
590+
self.thread_wakeup.close()
591+
569592
# If .join() is not called on the created processes then
570593
# some ctx.Queue methods may deadlock on Mac OS X.
571594
for p in self.processes.values():
595+
if broken:
596+
p.terminate()
572597
p.join()
573598

574599
def get_n_children_alive(self):

Lib/multiprocessing/queues.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ def cancel_join_thread(self):
158158
except AttributeError:
159159
pass
160160

161+
def _terminate_broken(self):
162+
# Close a Queue on error.
163+
164+
# gh-94777: Prevent queue writing to a pipe which is no longer read.
165+
self._reader.close()
166+
167+
self.close()
168+
self.join_thread()
169+
161170
def _start_thread(self):
162171
debug('Queue._start_thread()')
163172

@@ -169,13 +178,19 @@ def _start_thread(self):
169178
self._wlock, self._reader.close, self._writer.close,
170179
self._ignore_epipe, self._on_queue_feeder_error,
171180
self._sem),
172-
name='QueueFeederThread'
181+
name='QueueFeederThread',
182+
daemon=True,
173183
)
174-
self._thread.daemon = True
175184

176-
debug('doing self._thread.start()')
177-
self._thread.start()
178-
debug('... done self._thread.start()')
185+
try:
186+
debug('doing self._thread.start()')
187+
self._thread.start()
188+
debug('... done self._thread.start()')
189+
except:
190+
# gh-109047: During Python finalization, creating a thread
191+
# can fail with RuntimeError.
192+
self._thread = None
193+
raise
179194

180195
if not self._joincancelled:
181196
self._jointhread = Finalize(

Lib/test/test_concurrent_futures/test_process_pool.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import sys
3+
import threading
34
import time
45
import unittest
56
from concurrent import futures
@@ -187,6 +188,34 @@ def test_max_tasks_early_shutdown(self):
187188
for i, future in enumerate(futures):
188189
self.assertEqual(future.result(), mul(i, i))
189190

191+
def test_python_finalization_error(self):
192+
# gh-109047: Catch RuntimeError on thread creation
193+
# during Python finalization.
194+
195+
context = self.get_context()
196+
197+
# gh-109047: Mock the threading.start_new_thread() function to inject
198+
# RuntimeError: simulate the error raised during Python finalization.
199+
# Block the second creation: create _ExecutorManagerThread, but block
200+
# QueueFeederThread.
201+
orig_start_new_thread = threading._start_new_thread
202+
nthread = 0
203+
def mock_start_new_thread(func, *args):
204+
nonlocal nthread
205+
if nthread >= 1:
206+
raise RuntimeError("can't create new thread at "
207+
"interpreter shutdown")
208+
nthread += 1
209+
return orig_start_new_thread(func, *args)
210+
211+
with support.swap_attr(threading, '_start_new_thread',
212+
mock_start_new_thread):
213+
executor = self.executor_type(max_workers=2, mp_context=context)
214+
with executor:
215+
with self.assertRaises(BrokenProcessPool):
216+
list(executor.map(mul, [(2, 3)] * 10))
217+
executor.shutdown()
218+
190219

191220
create_executor_tests(globals(), ProcessPoolExecutorTest,
192221
executor_mixins=(ProcessPoolForkMixin,
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions
2+
when adding an item to the *call queue*. During Python finalization, creating a
3+
new thread can now raise :exc:`RuntimeError`. Catch the exception and call
4+
``terminate_broken()`` in this case. Patch by Victor Stinner.

0 commit comments

Comments
 (0)