Skip to content

Commit 763801a

Browse files
gh-95166: cancel map waited on future on timeout (GH-95169) (GH-95364)
Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> (cherry picked from commit e16d4ed) Co-authored-by: Thomas Grainger <tagrain@gmail.com>
1 parent 1230792 commit 763801a

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

Lib/concurrent/futures/_base.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
310310
done.update(waiter.finished_futures)
311311
return DoneAndNotDoneFutures(done, fs - done)
312312

313+
314+
def _result_or_cancel(fut, timeout=None):
315+
try:
316+
try:
317+
return fut.result(timeout)
318+
finally:
319+
fut.cancel()
320+
finally:
321+
# Break a reference cycle with the exception in self._exception
322+
del fut
323+
324+
313325
class Future(object):
314326
"""Represents the result of an asynchronous computation."""
315327

@@ -604,9 +616,9 @@ def result_iterator():
604616
while fs:
605617
# Careful not to keep a reference to the popped future
606618
if timeout is None:
607-
yield fs.pop().result()
619+
yield _result_or_cancel(fs.pop())
608620
else:
609-
yield fs.pop().result(end_time - time.monotonic())
621+
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
610622
finally:
611623
for future in fs:
612624
future.cancel()

Lib/test/test_concurrent_futures.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,33 @@ def submit(pool):
931931
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
932932
workers.submit(tuple)
933933

934+
def test_executor_map_current_future_cancel(self):
935+
stop_event = threading.Event()
936+
log = []
937+
938+
def log_n_wait(ident):
939+
log.append(f"{ident=} started")
940+
try:
941+
stop_event.wait()
942+
finally:
943+
log.append(f"{ident=} stopped")
944+
945+
with self.executor_type(max_workers=1) as pool:
946+
# submit work to saturate the pool
947+
fut = pool.submit(log_n_wait, ident="first")
948+
try:
949+
with contextlib.closing(
950+
pool.map(log_n_wait, ["second", "third"], timeout=0)
951+
) as gen:
952+
with self.assertRaises(TimeoutError):
953+
next(gen)
954+
finally:
955+
stop_event.set()
956+
fut.result()
957+
# ident='second' is cancelled as a result of raising a TimeoutError
958+
# ident='third' is cancelled because it remained in the collection of futures
959+
self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
960+
934961

935962
class ProcessPoolExecutorTest(ExecutorTest):
936963

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix :meth:`concurrent.futures.Executor.map` to cancel the currently waiting on future on an error - e.g. TimeoutError or KeyboardInterrupt.

0 commit comments

Comments
 (0)