Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-74028: concurrent.futures.Executor.map: introduce buffersize param for lazier behavior #125663

Merged
merged 45 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
45c3ec5
bpo-29842: concurrent.futures.Executor.map: add buffersize param for …
ebonnal Oct 12, 2024
bfb2c5c
test_map_buffersize: 1s sleep
ebonnal Oct 17, 2024
8539663
mention chunksize in ProcessPoolExecutor's buffersize docstring
ebonnal Oct 18, 2024
022b8c6
merge unittest into ExecutorTest
ebonnal Oct 18, 2024
7ced787
fix versionchanged
ebonnal Oct 18, 2024
cb5f26e
📜🤖 Added by blurb_it.
blurb-it[bot] Oct 18, 2024
f46ebe6
fix tests determinism
ebonnal Oct 18, 2024
eb26e86
add test_map_with_buffersize_on_empty_iterable
ebonnal Oct 18, 2024
0821f95
allow timeout + buffersize
ebonnal Oct 18, 2024
d95c55b
lint import
ebonnal Oct 18, 2024
c80f466
tests: polish
ebonnal Oct 18, 2024
90e6d7c
rephrase docstring
ebonnal Oct 25, 2024
ab91694
fix Doc/library/concurrent.futures.rst
ebonnal Dec 3, 2024
01b8adf
reorder imports
ebonnal Dec 3, 2024
2f8a63f
rephrase buffersize's ValueError
ebonnal Dec 3, 2024
1fb53a5
update 3.14.rst
ebonnal Dec 3, 2024
365c85d
edit docstring
ebonnal Dec 3, 2024
bf5f838
lint
ebonnal Dec 3, 2024
a0057f1
lint
ebonnal Dec 3, 2024
1aa1275
comment on weakref
ebonnal Dec 4, 2024
e0a9a9e
lint
ebonnal Dec 4, 2024
8d6ea97
test_map_with_buffersize_when_buffer_becomes_full: avoid using multip…
ebonnal Dec 4, 2024
6124868
lint
ebonnal Dec 4, 2024
c11276f
test_map_with_buffersize_and_timeout: avoid sleeping 0 seconds for win32
ebonnal Dec 4, 2024
ebb5337
remove test_map_with_buffersize_and_timeoutthat does not improve cove…
ebonnal Dec 4, 2024
602968c
extend unittesting to no and multiple input iterables
ebonnal Dec 5, 2024
b14e368
Update Lib/concurrent/futures/_base.py
ebonnal Dec 5, 2024
d37ce09
rename args_iter -> zipped_iterables
ebonnal Dec 5, 2024
cdf239c
remove period at end of error message
ebonnal Dec 10, 2024
0a49784
unit tests: merge into a single test method with test messages
ebonnal Dec 16, 2024
178d6fe
apply review on tests format
ebonnal Dec 16, 2024
516a94b
Update Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d…
ebonnal Dec 16, 2024
ba4ac81
Update Doc/whatsnew/3.14.rst
ebonnal Dec 16, 2024
9588059
use assertListEqual
ebonnal Dec 16, 2024
0427bf1
test_map_buffersize_validation: test negative buffersize
ebonnal Dec 16, 2024
af88fdf
explicitly checks buffersize's type and add test_map_buffersize_type_…
ebonnal Dec 16, 2024
1fcf3fe
test_map_buffersize_on_infinite_iterable: fetch the first 4 elements
ebonnal Dec 18, 2024
0892b2b
add `test_map_buffersize_on_multiple_infinite_iterables`
ebonnal Dec 25, 2024
579ba31
doc: specify that it is the size of a buffer of tasks and not results
ebonnal Jan 10, 2025
332826a
Merge remote-tracking branch 'cpython/main' into fix-issue-29842
ebonnal Feb 24, 2025
ef814e5
Update Doc/whatsnew/3.14.rst
ebonnal Feb 24, 2025
26c8d8d
Merge branch 'main' into fix-issue-29842
encukou Mar 3, 2025
7b1d5f6
remove redundant `iter`
ebonnal Mar 3, 2025
8dac531
Merge remote-tracking branch 'cpython/main' into fix-issue-29842
ebonnal Mar 10, 2025
bb756f4
add new line in 3.14.rst
ebonnal Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ Executor Objects
future = executor.submit(pow, 323, 1235)
print(future.result())

.. method:: map(fn, *iterables, timeout=None, chunksize=1)
.. method:: map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)

Similar to :func:`map(fn, *iterables) <map>` except:

* the *iterables* are collected immediately rather than lazily;
* The *iterables* are collected immediately rather than lazily, unless a
*buffersize* is specified to limit the number of submitted tasks whose
results have not yet been yielded. If the buffer is full, iteration over
the *iterables* pauses until a result is yielded from the buffer.

* *fn* is executed asynchronously and several calls to
*fn* may be made concurrently.
Expand All @@ -68,7 +71,10 @@ Executor Objects
*chunksize* has no effect.

.. versionchanged:: 3.5
Added the *chunksize* argument.
Added the *chunksize* parameter.

.. versionchanged:: next
Added the *buffersize* parameter.

.. method:: shutdown(wait=True, *, cancel_futures=False)

Expand Down
7 changes: 7 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ contextvars
* Support context manager protocol by :class:`contextvars.Token`.
(Contributed by Andrew Svetlov in :gh:`129889`.)

* Add the optional ``buffersize`` parameter to
:meth:`concurrent.futures.Executor.map` to limit the number of submitted
tasks whose results have not yet been yielded. If the buffer is full,
iteration over the *iterables* pauses until a result is yielded from the
buffer.
(Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.)


ctypes
------
Expand Down
32 changes: 30 additions & 2 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import threading
import time
import types
import weakref
from itertools import islice

FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
Expand Down Expand Up @@ -572,7 +574,7 @@ def submit(self, fn, /, *args, **kwargs):
"""
raise NotImplementedError()

def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
"""Returns an iterator equivalent to map(fn, iter).

Args:
Expand All @@ -584,6 +586,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
buffersize: The number of submitted tasks whose results have not
yet been yielded. If the buffer is full, iteration over the
iterables pauses until a result is yielded from the buffer.
If None, all input elements are eagerly collected, and a task is
submitted for each.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
Expand All @@ -594,10 +601,25 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if buffersize is not None and not isinstance(buffersize, int):
raise TypeError("buffersize must be an integer or None")
if buffersize is not None and buffersize < 1:
raise ValueError("buffersize must be None or > 0")

if timeout is not None:
end_time = timeout + time.monotonic()

fs = [self.submit(fn, *args) for args in zip(*iterables)]
zipped_iterables = zip(*iterables)
if buffersize:
fs = collections.deque(
self.submit(fn, *args) for args in islice(zipped_iterables, buffersize)
)
else:
fs = [self.submit(fn, *args) for args in zipped_iterables]

# Use a weak reference to ensure that the executor can be garbage
# collected independently of the result_iterator closure.
executor_weakref = weakref.ref(self)

# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
Expand All @@ -606,6 +628,12 @@ def result_iterator():
# reverse to keep finishing order
fs.reverse()
while fs:
if (
buffersize
and (executor := executor_weakref())
and (args := next(zipped_iterables, None))
):
fs.appendleft(executor.submit(fn, *args))
# Careful not to keep a reference to the popped future
if timeout is None:
yield _result_or_cancel(fs.pop())
Expand Down
10 changes: 8 additions & 2 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ def submit(self, fn, /, *args, **kwargs):
return f
submit.__doc__ = _base.Executor.submit.__doc__

def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
"""Returns an iterator equivalent to map(fn, iter).

Args:
Expand All @@ -824,6 +824,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
chunksize: If greater than one, the iterables will be chopped into
chunks of size chunksize and submitted to the process pool.
If set to one, the items in the list will be sent one at a time.
buffersize: The number of submitted tasks whose results have not
yet been yielded. If the buffer is full, iteration over the
iterables pauses until a result is yielded from the buffer.
If None, all input elements are eagerly collected, and a task is
submitted for each.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
Expand All @@ -839,7 +844,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):

results = super().map(partial(_process_chunk, fn),
itertools.batched(zip(*iterables), chunksize),
timeout=timeout)
timeout=timeout,
buffersize=buffersize)
return _chain_from_iterable_of_lists(results)

def shutdown(self, wait=True, *, cancel_futures=False):
Expand Down
70 changes: 70 additions & 0 deletions Lib/test/test_concurrent_futures/executor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import itertools
import threading
import time
import weakref
from concurrent import futures
from operator import add
from test import support
from test.support import Py_GIL_DISABLED

Expand Down Expand Up @@ -71,6 +73,74 @@ def test_map_timeout(self):

self.assertEqual([None, None], results)

def test_map_buffersize_type_validation(self):
for buffersize in ("foo", 2.0):
with self.subTest(buffersize=buffersize):
with self.assertRaisesRegex(
TypeError,
"buffersize must be an integer or None",
):
self.executor.map(str, range(4), buffersize=buffersize)

def test_map_buffersize_value_validation(self):
for buffersize in (0, -1):
with self.subTest(buffersize=buffersize):
with self.assertRaisesRegex(
ValueError,
"buffersize must be None or > 0",
):
self.executor.map(str, range(4), buffersize=buffersize)

def test_map_buffersize(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
with self.subTest(buffersize=buffersize):
res = self.executor.map(str, ints, buffersize=buffersize)
self.assertListEqual(list(res), ["0", "1", "2", "3"])

def test_map_buffersize_on_multiple_iterables(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
with self.subTest(buffersize=buffersize):
res = self.executor.map(add, ints, ints, buffersize=buffersize)
self.assertListEqual(list(res), [0, 2, 4, 6])

def test_map_buffersize_on_infinite_iterable(self):
res = self.executor.map(str, itertools.count(), buffersize=2)
self.assertEqual(next(res, None), "0")
self.assertEqual(next(res, None), "1")
self.assertEqual(next(res, None), "2")

def test_map_buffersize_on_multiple_infinite_iterables(self):
res = self.executor.map(
add,
itertools.count(),
itertools.count(),
buffersize=2
)
self.assertEqual(next(res, None), 0)
self.assertEqual(next(res, None), 2)
self.assertEqual(next(res, None), 4)

def test_map_buffersize_on_empty_iterable(self):
res = self.executor.map(str, [], buffersize=2)
self.assertIsNone(next(res, None))

def test_map_buffersize_without_iterable(self):
res = self.executor.map(str, buffersize=2)
self.assertIsNone(next(res, None))

def test_map_buffersize_when_buffer_is_full(self):
ints = iter(range(4))
buffersize = 2
self.executor.map(str, ints, buffersize=buffersize)
self.executor.shutdown(wait=True) # wait for tasks to complete
self.assertEqual(
next(ints),
buffersize,
msg="should have fetched only `buffersize` elements from `ints`.",
)

def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add the optional ``buffersize`` parameter to
:meth:`concurrent.futures.Executor.map` to limit the number of submitted tasks
whose results have not yet been yielded. If the buffer is full, iteration over
the *iterables* pauses until a result is yielded from the buffer.
Loading