From dfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A9ry=20Ogam?= Date: Wed, 4 May 2022 01:49:57 +0200 Subject: [PATCH] bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage collection and explicit close (#31913) --- Lib/multiprocessing/queues.py | 23 +++++++++---------- .../2022-04-26-19-01-13.bpo-47029.qkT42X.rst | 4 ++++ 2 files changed, 15 insertions(+), 12 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index a2901814876d6c..f37f114a968871 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -139,13 +139,10 @@ def put_nowait(self, obj): def close(self): self._closed = True - try: - self._reader.close() - finally: - close = self._close - if close: - self._close = None - close() + close = self._close + if close: + self._close = None + close() def join_thread(self): debug('Queue.join_thread()') @@ -169,8 +166,9 @@ def _start_thread(self): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, - self._wlock, self._writer.close, self._ignore_epipe, - self._on_queue_feeder_error, self._sem), + self._wlock, self._reader.close, self._writer.close, + self._ignore_epipe, self._on_queue_feeder_error, + self._sem), name='QueueFeederThread' ) self._thread.daemon = True @@ -211,8 +209,8 @@ def _finalize_close(buffer, notempty): notempty.notify() @staticmethod - def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, - onerror, queue_sem): + def _feed(buffer, notempty, send_bytes, writelock, reader_close, + writer_close, ignore_epipe, onerror, queue_sem): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -238,7 +236,8 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, obj = bpopleft() if obj is sentinel: debug('feeder thread got sentinel -- exiting') - close() + reader_close() + writer_close() return # serialize the data before acquiring the lock diff --git a/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst new file mode 100644 index 00000000000000..cc054673338f0b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst @@ -0,0 +1,4 @@ +Always close the read end of the pipe used by :class:`multiprocessing.Queue` +*after* the last write of buffered data to the write end of the pipe to avoid +:exc:`BrokenPipeError` at garbage collection and at +:meth:`multiprocessing.Queue.close` calls. Patch by Géry Ogam.