-
-
Notifications
You must be signed in to change notification settings - Fork 32.3k
bpo-31540 Add context management for concurrent.futures.ProcessPoolExecutor #3682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
6376291
Add context management for ProcessPoolExecutor+CLN
tomMoral e787b6f
FIX skip tests that re-run the entire test suite
tomMoral 926ad27
NEW add whatsnew entry
tomMoral a945891
CLN contex->mp_context+FIX context in test
tomMoral 7a093f0
TST add test_ressources_gced_in_workers
tomMoral 3953ee3
DOC update ProcessPoolExecutor doc
tomMoral 9ac6305
FIX doc context + use assertTrue
tomMoral 6e4104b
Doc nit: simplify sentence
pitrou 8ec5ab4
NEWS nits
pitrou File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,8 +50,7 @@ | |
from concurrent.futures import _base | ||
import queue | ||
from queue import Full | ||
import multiprocessing | ||
from multiprocessing import SimpleQueue | ||
import multiprocessing as mp | ||
from multiprocessing.connection import wait | ||
import threading | ||
import weakref | ||
|
@@ -74,11 +73,11 @@ | |
# threads/processes finish. | ||
|
||
_threads_queues = weakref.WeakKeyDictionary() | ||
_shutdown = False | ||
_global_shutdown = False | ||
|
||
def _python_exit(): | ||
global _shutdown | ||
_shutdown = True | ||
global _global_shutdown | ||
_global_shutdown = True | ||
items = list(_threads_queues.items()) | ||
for t, q in items: | ||
q.put(None) | ||
|
@@ -158,12 +157,10 @@ def _process_worker(call_queue, result_queue): | |
This worker is run in a separate process. | ||
|
||
Args: | ||
call_queue: A multiprocessing.Queue of _CallItems that will be read and | ||
call_queue: A ctx.Queue of _CallItems that will be read and | ||
evaluated by the worker. | ||
result_queue: A multiprocessing.Queue of _ResultItems that will written | ||
result_queue: A ctx.Queue of _ResultItems that will written | ||
to by the worker. | ||
shutdown: A multiprocessing.Event that will be set as a signal to the | ||
worker that it should exit when call_queue is empty. | ||
""" | ||
while True: | ||
call_item = call_queue.get(block=True) | ||
|
@@ -180,6 +177,11 @@ def _process_worker(call_queue, result_queue): | |
result_queue.put(_ResultItem(call_item.work_id, | ||
result=r)) | ||
|
||
# Liberate the resource as soon as possible, to avoid holding onto | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be easy to add a test for this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a test for this behavior. |
||
# open files or shared memory that is not needed anymore | ||
del call_item | ||
|
||
|
||
def _add_call_item_to_queue(pending_work_items, | ||
work_ids, | ||
call_queue): | ||
|
@@ -231,20 +233,21 @@ def _queue_management_worker(executor_reference, | |
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns | ||
this thread. Used to determine if the ProcessPoolExecutor has been | ||
garbage collected and that this function can exit. | ||
process: A list of the multiprocessing.Process instances used as | ||
process: A list of the ctx.Process instances used as | ||
workers. | ||
pending_work_items: A dict mapping work ids to _WorkItems e.g. | ||
{5: <_WorkItem...>, 6: <_WorkItem...>, ...} | ||
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). | ||
call_queue: A multiprocessing.Queue that will be filled with _CallItems | ||
call_queue: A ctx.Queue that will be filled with _CallItems | ||
derived from _WorkItems for processing by the process workers. | ||
result_queue: A multiprocessing.Queue of _ResultItems generated by the | ||
result_queue: A ctx.SimpleQueue of _ResultItems generated by the | ||
process workers. | ||
""" | ||
executor = None | ||
|
||
def shutting_down(): | ||
return _shutdown or executor is None or executor._shutdown_thread | ||
return (_global_shutdown or executor is None | ||
or executor._shutdown_thread) | ||
|
||
def shutdown_worker(): | ||
# This is an upper bound | ||
|
@@ -254,7 +257,7 @@ def shutdown_worker(): | |
# Release the queue's resources as soon as possible. | ||
call_queue.close() | ||
# If .join() is not called on the created processes then | ||
# some multiprocessing.Queue methods may deadlock on Mac OS X. | ||
# some ctx.Queue methods may deadlock on Mac OS X. | ||
for p in processes.values(): | ||
p.join() | ||
|
||
|
@@ -377,13 +380,15 @@ class BrokenProcessPool(RuntimeError): | |
|
||
|
||
class ProcessPoolExecutor(_base.Executor): | ||
def __init__(self, max_workers=None): | ||
def __init__(self, max_workers=None, mp_context=None): | ||
"""Initializes a new ProcessPoolExecutor instance. | ||
|
||
Args: | ||
max_workers: The maximum number of processes that can be used to | ||
execute the given calls. If None or not given then as many | ||
worker processes will be created as the machine has processors. | ||
mp_context: A multiprocessing context to launch the workers. This | ||
object should provide SimpleQueue, Queue and Process. | ||
""" | ||
_check_system_limits() | ||
|
||
|
@@ -394,17 +399,20 @@ def __init__(self, max_workers=None): | |
raise ValueError("max_workers must be greater than 0") | ||
|
||
self._max_workers = max_workers | ||
if mp_context is None: | ||
mp_context = mp.get_context() | ||
self._mp_context = mp_context | ||
|
||
# Make the call queue slightly larger than the number of processes to | ||
# prevent the worker processes from idling. But don't make it too big | ||
# because futures in the call queue cannot be cancelled. | ||
self._call_queue = multiprocessing.Queue(self._max_workers + | ||
EXTRA_QUEUED_CALLS) | ||
queue_size = self._max_workers + EXTRA_QUEUED_CALLS | ||
self._call_queue = mp_context.Queue(queue_size) | ||
# Killed worker processes can produce spurious "broken pipe" | ||
# tracebacks in the queue's own worker thread. But we detect killed | ||
# processes anyway, so silence the tracebacks. | ||
self._call_queue._ignore_epipe = True | ||
self._result_queue = SimpleQueue() | ||
self._result_queue = mp_context.SimpleQueue() | ||
self._work_ids = queue.Queue() | ||
self._queue_management_thread = None | ||
# Map of pids to processes | ||
|
@@ -426,23 +434,23 @@ def weakref_cb(_, q=self._result_queue): | |
# Start the processes so that their sentinels are known. | ||
self._adjust_process_count() | ||
self._queue_management_thread = threading.Thread( | ||
target=_queue_management_worker, | ||
args=(weakref.ref(self, weakref_cb), | ||
self._processes, | ||
self._pending_work_items, | ||
self._work_ids, | ||
self._call_queue, | ||
self._result_queue)) | ||
target=_queue_management_worker, | ||
args=(weakref.ref(self, weakref_cb), | ||
self._processes, | ||
self._pending_work_items, | ||
self._work_ids, | ||
self._call_queue, | ||
self._result_queue)) | ||
self._queue_management_thread.daemon = True | ||
self._queue_management_thread.start() | ||
_threads_queues[self._queue_management_thread] = self._result_queue | ||
|
||
def _adjust_process_count(self): | ||
for _ in range(len(self._processes), self._max_workers): | ||
p = multiprocessing.Process( | ||
target=_process_worker, | ||
args=(self._call_queue, | ||
self._result_queue)) | ||
p = self._mp_context.Process( | ||
target=_process_worker, | ||
args=(self._call_queue, | ||
self._result_queue)) | ||
p.start() | ||
self._processes[p.pid] = p | ||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot the rationale for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When coverage is used with multiprocessing, the spawning of new interpreter launch a new test session, resulting in a mess. I think it was previously working with the
fork
context but when I launch the test with the three backends, it also launches new test sessions.Thus, I disabled coverage tests with test_concurrent_futures. I am not sure of what I should do if this is not the case. The duplicated test sessions could results from some command line arguments parsing either in the semaphore tracker or the forkserver but I do not think it is linked to this PR.
Let me know if this make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does make sense, thank you.