Skip to content

Commit

Permalink
pythongh-109047: concurrent.futures catches PythonFinalizationError
Browse files Browse the repository at this point in the history
concurrent.futures: The 'executor manager thread' now catches
PythonFinalizationError, it calls terminate_broken(). The exception
occurs while Python is being finalized when adding an item to the
'call queue' tries to create a new 'queue feeder' thread.

Add test_python_finalization_error() to test_concurrent_futures.

concurrent.futures._ExecutorManagerThread changes:

* terminate_broken() no longer calls shutdown_workers() since the
  queue is no longer working anymore (read and write ends of the
  queue pipe are closed).
* wait_result_broken_or_wakeup() now uses the short form of
  traceback.format_exception().

multiprocessing.Queue changes:

* Add _terminate_broken() method.
* _start_thread() starts _thread to None on exception to prevent
  leaking "dangling threads" even if the thread was not started
  yet.
  • Loading branch information
vstinner committed Sep 25, 2023
1 parent 6d969f3 commit ab2a33b
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 17 deletions.
27 changes: 15 additions & 12 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,12 @@ def run(self):
# Main loop for the executor manager thread.

while True:
self.add_call_item_to_queue()
try:
self.add_call_item_to_queue()
except PythonFinalizationError as exc:
cause = format_exception(exc)
self.terminate_broken(cause)
return

result_item, is_broken, cause = self.wait_result_broken_or_wakeup()

Expand Down Expand Up @@ -425,8 +430,8 @@ def wait_result_broken_or_wakeup(self):
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as e:
cause = format_exception(type(e), e, e.__traceback__)
except BaseException as exc:
cause = format_exception(exc)

elif wakeup_reader in ready:
is_broken = False
Expand Down Expand Up @@ -515,16 +520,10 @@ def terminate_broken(self, cause):
for p in self.processes.values():
p.terminate()

# Prevent queue writing to a pipe which is no longer read.
# https://github.com/python/cpython/issues/94777
self.call_queue._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
self.call_queue._writer.close()
self.call_queue._terminate_broken()

# clean up resources
self.join_executor_internals()
self._join_executor_internals(broken=True)

def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
Expand Down Expand Up @@ -567,7 +566,11 @@ def shutdown_workers(self):
break

def join_executor_internals(self):
self.shutdown_workers()
self._join_executor_internals()

def _join_executor_internals(self, broken=False):
if not broken:
self.shutdown_workers()
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
Expand Down
27 changes: 22 additions & 5 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,19 @@ def cancel_join_thread(self):
except AttributeError:
pass

def _terminate_broken(self):
# Close a Queue on error.

# gh-94777: Prevent queue writing to a pipe which is no longer read.
self._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
self._writer.close()

self.close()
self.join_thread()

def _start_thread(self):
debug('Queue._start_thread()')

Expand All @@ -169,13 +182,17 @@ def _start_thread(self):
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
name='QueueFeederThread'
name='QueueFeederThread',
daemon=True,
)
self._thread.daemon = True

debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
try:
debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
except:
self._thread = None
raise

if not self._joincancelled:
self._jointhread = Finalize(
Expand Down
22 changes: 22 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import threading
import time
import unittest
from concurrent import futures
Expand Down Expand Up @@ -187,6 +188,27 @@ def test_max_tasks_early_shutdown(self):
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))

def test_python_finalization_error(self):
context = self.get_context()

# Create _ExecutorManagerThread, but block QueueFeederThread
orig_start_new_thread = threading._start_new_thread
nthread = 0
def mock_start_new_thread(func, *args):
nonlocal nthread
if nthread >= 1:
raise PythonFinalizationError()
nthread += 1
return orig_start_new_thread(func, *args)

with support.swap_attr(threading, '_start_new_thread',
mock_start_new_thread):
executor = self.executor_type(max_workers=2, mp_context=context)
with executor:
with self.assertRaises(BrokenProcessPool):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
:mod:`concurrent.futures`: The *executor manager thread* now catches
:exc:`PythonFinalizationError`, it calls ``terminate_broken()``. The exception
occurs while Python is being finalized when adding an item to the *call queue*
tries to create a new *queue feeder* thread. Patch by Victor Stinner.

0 comments on commit ab2a33b

Please sign in to comment.