Skip to content

bpo-31540 Add context management for concurrent.futures.ProcessPoolExecutor #3682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ matrix:
./venv/bin/python -m test.pythoninfo
script:
# Skip tests that re-run the entire test suite.
- ./venv/bin/python -m coverage run --pylib -m test --fail-env-changed -uall,-cpu -x test_multiprocessing_fork -x test_multiprocessing_forkserver -x test_multiprocessing_spawn
- ./venv/bin/python -m coverage run --pylib -m test --fail-env-changed -uall,-cpu -x test_multiprocessing_fork -x test_multiprocessing_forkserver -x test_multiprocessing_spawn -x test_concurrent_futures
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot the rationale for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When coverage is used with multiprocessing, the spawning of new interpreter launch a new test session, resulting in a mess. I think it was previously working with the fork context but when I launch the test with the three backends, it also launches new test sessions.

Thus, I disabled coverage tests with test_concurrent_futures. I am not sure of what I should do if this is not the case. The duplicated test sessions could results from some command line arguments parsing either in the semaphore tracker or the forkserver but I do not think it is linked to this PR.

Let me know if this make sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does make sense, thank you.

after_script: # Probably should be after_success once test suite updated to run under coverage.py.
# Make the `coverage` command available to Codecov w/ a version of Python that can parse all source files.
- source ./venv/bin/activate
Expand Down
9 changes: 8 additions & 1 deletion Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,20 +191,27 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.

.. class:: ProcessPoolExecutor(max_workers=None)
.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None)

An :class:`Executor` subclass that executes calls asynchronously using a pool
of at most *max_workers* processes. If *max_workers* is ``None`` or not
given, it will default to the number of processors on the machine.
If *max_workers* is lower or equal to ``0``, then a :exc:`ValueError`
will be raised.
*mp_context* can be a multiprocessing context or None. It will be used to
launch the workers. If *mp_context* is ``None`` or not given, the default
multiprocessing context is used.

.. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a
:exc:`BrokenProcessPool` error is now raised. Previously, behaviour
was undefined but operations on the executor or its futures would often
freeze or deadlock.

.. versionchanged:: 3.7
The *mp_context* argument was added to allow users to control the
start_method for worker processes created by the pool.


.. _processpoolexecutor-example:

Expand Down
66 changes: 37 additions & 29 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@
from concurrent.futures import _base
import queue
from queue import Full
import multiprocessing
from multiprocessing import SimpleQueue
import multiprocessing as mp
from multiprocessing.connection import wait
import threading
import weakref
Expand All @@ -74,11 +73,11 @@
# threads/processes finish.

_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
_global_shutdown = False

def _python_exit():
global _shutdown
_shutdown = True
global _global_shutdown
_global_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
Expand Down Expand Up @@ -158,12 +157,10 @@ def _process_worker(call_queue, result_queue):
This worker is run in a separate process.

Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and
call_queue: A ctx.Queue of _CallItems that will be read and
evaluated by the worker.
result_queue: A multiprocessing.Queue of _ResultItems that will written
result_queue: A ctx.Queue of _ResultItems that will written
to by the worker.
shutdown: A multiprocessing.Event that will be set as a signal to the
worker that it should exit when call_queue is empty.
"""
while True:
call_item = call_queue.get(block=True)
Expand All @@ -180,6 +177,11 @@ def _process_worker(call_queue, result_queue):
result_queue.put(_ResultItem(call_item.work_id,
result=r))

# Liberate the resource as soon as possible, to avoid holding onto
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be easy to add a test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for this behavior.

# open files or shared memory that is not needed anymore
del call_item


def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
Expand Down Expand Up @@ -231,20 +233,21 @@ def _queue_management_worker(executor_reference,
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
this thread. Used to determine if the ProcessPoolExecutor has been
garbage collected and that this function can exit.
process: A list of the multiprocessing.Process instances used as
process: A list of the ctx.Process instances used as
workers.
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
call_queue: A multiprocessing.Queue that will be filled with _CallItems
call_queue: A ctx.Queue that will be filled with _CallItems
derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the
result_queue: A ctx.SimpleQueue of _ResultItems generated by the
process workers.
"""
executor = None

def shutting_down():
return _shutdown or executor is None or executor._shutdown_thread
return (_global_shutdown or executor is None
or executor._shutdown_thread)

def shutdown_worker():
# This is an upper bound
Expand All @@ -254,7 +257,7 @@ def shutdown_worker():
# Release the queue's resources as soon as possible.
call_queue.close()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS X.
# some ctx.Queue methods may deadlock on Mac OS X.
for p in processes.values():
p.join()

Expand Down Expand Up @@ -377,13 +380,15 @@ class BrokenProcessPool(RuntimeError):


class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
def __init__(self, max_workers=None, mp_context=None):
"""Initializes a new ProcessPoolExecutor instance.

Args:
max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
mp_context: A multiprocessing context to launch the workers. This
object should provide SimpleQueue, Queue and Process.
"""
_check_system_limits()

Expand All @@ -394,17 +399,20 @@ def __init__(self, max_workers=None):
raise ValueError("max_workers must be greater than 0")

self._max_workers = max_workers
if mp_context is None:
mp_context = mp.get_context()
self._mp_context = mp_context

# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = mp_context.Queue(queue_size)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = SimpleQueue()
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
# Map of pids to processes
Expand All @@ -426,23 +434,23 @@ def weakref_cb(_, q=self._result_queue):
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue

def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue))
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue))
p.start()
self._processes[p.pid] = p

Expand Down
Loading