-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
gh-74028: concurrent.futures.Executor.map
: introduce buffersize
param for lazier behavior
#125663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
45c3ec5
bfb2c5c
8539663
022b8c6
7ced787
cb5f26e
f46ebe6
eb26e86
0821f95
d95c55b
c80f466
90e6d7c
ab91694
01b8adf
2f8a63f
1fb53a5
365c85d
bf5f838
a0057f1
1aa1275
e0a9a9e
8d6ea97
6124868
c11276f
ebb5337
602968c
b14e368
d37ce09
cdf239c
0a49784
178d6fe
516a94b
ba4ac81
9588059
0427bf1
af88fdf
1fcf3fe
0892b2b
579ba31
332826a
ef814e5
26c8d8d
7b1d5f6
8dac531
bb756f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ | |
import threading | ||
import time | ||
import types | ||
import weakref | ||
from itertools import islice | ||
|
||
FIRST_COMPLETED = 'FIRST_COMPLETED' | ||
FIRST_EXCEPTION = 'FIRST_EXCEPTION' | ||
|
@@ -572,7 +574,7 @@ def submit(self, fn, /, *args, **kwargs): | |
""" | ||
raise NotImplementedError() | ||
|
||
def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): | ||
"""Returns an iterator equivalent to map(fn, iter). | ||
|
||
Args: | ||
|
@@ -584,6 +586,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | |
before being passed to a child process. This argument is only | ||
used by ProcessPoolExecutor; it is ignored by | ||
ThreadPoolExecutor. | ||
buffersize: The number of submitted tasks whose results have not | ||
yet been yielded. If the buffer is full, iteration over the | ||
iterables pauses until a result is yielded from the buffer. | ||
If None, all input elements are eagerly collected, and a task is | ||
submitted for each. | ||
|
||
Returns: | ||
An iterator equivalent to: map(func, *iterables) but the calls may | ||
|
@@ -594,10 +601,25 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | |
before the given timeout. | ||
Exception: If fn(*args) raises for any values. | ||
""" | ||
if buffersize is not None and not isinstance(buffersize, int): | ||
raise TypeError("buffersize must be an integer or None") | ||
if buffersize is not None and buffersize < 1: | ||
raise ValueError("buffersize must be None or > 0") | ||
|
||
if timeout is not None: | ||
end_time = timeout + time.monotonic() | ||
|
||
fs = [self.submit(fn, *args) for args in zip(*iterables)] | ||
zipped_iterables = zip(*iterables) | ||
if buffersize: | ||
fs = collections.deque( | ||
self.submit(fn, *args) for args in islice(zipped_iterables, buffersize) | ||
) | ||
else: | ||
fs = [self.submit(fn, *args) for args in zipped_iterables] | ||
|
||
# Use a weak reference to ensure that the executor can be garbage | ||
# collected independently of the result_iterator closure. | ||
executor_weakref = weakref.ref(self) | ||
ebonnal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Yield must be hidden in closure so that the futures are submitted | ||
# before the first iterator value is required. | ||
|
@@ -606,6 +628,12 @@ def result_iterator(): | |
# reverse to keep finishing order | ||
fs.reverse() | ||
while fs: | ||
if ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ebonnal I believe you got this part slightly wrong, "off-by-one". IIUC, the number of pending futures cannot be larger than Fortunately, looks like the fix is trivial: you simply have to yield first, next append to the queue: diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index d98b1ebdd58..3b9ccf4d651 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -628,17 +628,17 @@ def result_iterator():
# reverse to keep finishing order
fs.reverse()
while fs:
+ # Careful not to keep a reference to the popped future
+ if timeout is None:
+ yield _result_or_cancel(fs.pop())
+ else:
+ yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
if (
buffersize
and (executor := executor_weakref())
and (args := next(zipped_iterables, None))
):
fs.appendleft(executor.submit(fn, *args))
- # Careful not to keep a reference to the popped future
- if timeout is None:
- yield _result_or_cancel(fs.pop())
- else:
- yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
finally:
for future in fs:
future.cancel() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @dalcinl, TL;DR: fyi we have #131467 that is open and tackling this "off-by-one" situation. Would be great to get your review there! 🙏🏻 You will notice that it is not "just" moving the yield before the enqueue, let's see why on a simple scenario, explaining the 3 behaviors: Scenario:it = executor.map(fn, iterable, buffersize=buffersize)
# point A
next(it)
# point B
next(it)
# point C "enqueue -> wait -> yield" (current, introduced by this PR) behavior
pro: "wait -> yield -> enqueue" (your proposal) behavior
pro: never exceed "wait -> enqueue -> yield" (#131467) behavior
pros:
Let me know if it makes sense 🙏🏻 |
||
buffersize | ||
and (executor := executor_weakref()) | ||
and (args := next(zipped_iterables, None)) | ||
): | ||
fs.appendleft(executor.submit(fn, *args)) | ||
gpshead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Careful not to keep a reference to the popped future | ||
if timeout is None: | ||
yield _result_or_cancel(fs.pop()) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
Add the optional ``buffersize`` parameter to | ||
:meth:`concurrent.futures.Executor.map` to limit the number of submitted tasks | ||
whose results have not yet been yielded. If the buffer is full, iteration over | ||
the *iterables* pauses until a result is yielded from the buffer. |
Uh oh!
There was an error while loading. Please reload this page.