From dc3cc33f8ac5747087d293d5ad337eb5858c2bdf Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 23 Jan 2019 01:01:36 -0800 Subject: [PATCH] Rewrite unix pipe fd handling logic This addresses a number of issues: - Fixes a major issue where aclose() called notify_fd_closed() unconditionally, even if the fd was closed; if the fd had already been recycled this could (and did) affect unrelated file descriptors: https://github.com/python-trio/trio/issues/661#issuecomment-456582356 - Fixes a theoretical issue (not yet observed in the wild) where a poorly timed close could fail to be noticed by other tasks (gh-661) - Adds ConflictDetectors to catch attempts to use the same stream from multiple tasks simultaneously - Switches from inheritance to composition (gh-830) Still todo: - Tests for these race conditions that snuck through - Audit _windows_pipes.py and _socket.py for related issues --- newsfragments/661.bugfix.rst | 3 + trio/_unix_pipes.py | 214 ++++++++++++++++++++-------------- trio/tests/test_unix_pipes.py | 28 ++--- 3 files changed, 144 insertions(+), 101 deletions(-) create mode 100644 newsfragments/661.bugfix.rst diff --git a/newsfragments/661.bugfix.rst b/newsfragments/661.bugfix.rst new file mode 100644 index 0000000000..8f573aa95f --- /dev/null +++ b/newsfragments/661.bugfix.rst @@ -0,0 +1,3 @@ +Fixed several bugs in the new subprocess pipe support, where +operations on a closed pipe could accidentally affect another +unrelated pipe due to internal file-descriptor reuse. diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index 629d66da3a..c6bba49d52 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -1,117 +1,161 @@ import fcntl import os from typing import Tuple +import errno from . import _core from ._abc import SendStream, ReceiveStream +from ._util import ConflictDetector __all__ = ["PipeSendStream", "PipeReceiveStream", "make_pipe"] -class _PipeMixin: - def __init__(self, pipefd: int) -> None: - if not isinstance(pipefd, int): - raise TypeError( - "{0.__class__.__name__} needs a pipe fd".format(self) - ) - - self._pipe = pipefd - self._closed = False - - flags = fcntl.fcntl(self._pipe, fcntl.F_GETFL) - fcntl.fcntl(self._pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - def _close(self): - if self._closed: +class _FdHolder: + # This class holds onto a raw file descriptor, in non-blocking mode, and + # is responsible for managing its lifecycle. In particular, it's + # responsible for making sure it gets closed, and also for tracking + # whether it's been closed. + # + # The way we track closure is to set the .fd field to -1, discarding the + # original value. You might think that this is a strange idea, since it + # overloads the same field to do two different things. Wouldn't it be more + # natural to have a dedicated .closed field? But that would be more + # error-prone. Fds are represented by small integers, and once an fd is + # closed, its integer value may be reused immediately. If we accidentally + # used the old fd after being closed, we might end up doing something to + # another unrelated fd that happened to get assigned the same integer + # value. By throwing away the integer value immediately, it becomes + # impossible to make this mistake – we'll just get an EBADF. + # + # (This trick was copied from the stdlib socket module.) + def __init__(self, fd: int): + if not isinstance(fd, int): + raise TypeError("file descriptor must be an int") + self.fd = fd + # Flip the fd to non-blocking mode + flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) + fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + @property + def closed(self): + return self.fd == -1 + + def _raw_close(self): + # This doesn't assume it's in a trio context, so it can be called from + # __del__. You should never call it from Trio context, because it + # skips calling notify_fd_close. But from __del__, skipping that is + # OK, because notify_fd_close just wakes up other tasks that are + # waiting on this fd, and those tasks hold a reference to this object. + # So if __del__ is being called, we know there aren't any tasks that + # need to be woken. + if self.closed: return + fd = self.fd + self.fd = -1 + os.close(fd) - self._closed = True - os.close(self._pipe) + def __del__(self): + self._raw_close() async def aclose(self): - # XX: This would be in _close, but this can only be used from an - # async context. - _core.notify_fd_close(self._pipe) - self._close() + if not self.closed: + _core.notify_fd_close(self.fd) + self._raw_close() await _core.checkpoint() - def fileno(self) -> int: - """Gets the file descriptor for this pipe.""" - return self._pipe - - def __del__(self): - self._close() - -class PipeSendStream(_PipeMixin, SendStream): +class PipeSendStream(SendStream): """Represents a send stream over an os.pipe object.""" + def __init__(self, fd: int): + self._fd_holder = _FdHolder(fd) + self._conflict_detector = ConflictDetector( + "another task is using this pipe" + ) + async def send_all(self, data: bytes): - # we have to do this no matter what - await _core.checkpoint() - if self._closed: - raise _core.ClosedResourceError("this pipe is already closed") + async with self._conflict_detector: + # have to check up front, because send_all(b"") on a closed pipe + # should raise + if self._fd_holder.closed: + raise _core.ClosedResourceError("this pipe was already closed") + + length = len(data) + # adapted from the SocketStream code + with memoryview(data) as view: + sent = 0 + while sent < length: + with view[sent:] as remaining: + try: + sent += os.write(self._fd_holder.fd, remaining) + except BlockingIOError: + await _core.wait_writable(self._fd_holder.fd) + except OSError as e: + if e.errno == errno.EBADF: + raise _core.ClosedResourceError( + "this pipe was closed" + ) from None + else: + raise _core.BrokenResourceError from e - if not data: - return + async def wait_send_all_might_not_block(self) -> None: + async with self._conflict_detector: + if self._fd_holder.closed: + raise _core.ClosedResourceError("this pipe was already closed") + try: + await _core.wait_writable(self._fd_holder.fd) + except BrokenPipeError as e: + # kqueue: raises EPIPE on wait_writable instead + # of sending, which is annoying + raise _core.BrokenResourceError from e - length = len(data) - # adapted from the SocketStream code - with memoryview(data) as view: - total_sent = 0 - while total_sent < length: - with view[total_sent:] as remaining: - try: - total_sent += os.write(self._pipe, remaining) - except BrokenPipeError as e: - await _core.checkpoint() - raise _core.BrokenResourceError from e - except BlockingIOError: - await self.wait_send_all_might_not_block() + async def aclose(self): + await self._fd_holder.aclose() - async def wait_send_all_might_not_block(self) -> None: - if self._closed: - await _core.checkpoint() - raise _core.ClosedResourceError("This pipe is already closed") - - try: - await _core.wait_writable(self._pipe) - except BrokenPipeError as e: - # kqueue: raises EPIPE on wait_writable instead - # of sending, which is annoying - # also doesn't checkpoint so we have to do that - # ourselves here too - await _core.checkpoint() - raise _core.BrokenResourceError from e - - -class PipeReceiveStream(_PipeMixin, ReceiveStream): + def fileno(self): + return self._fd_holder.fd + + +class PipeReceiveStream(ReceiveStream): """Represents a receive stream over an os.pipe object.""" + def __init__(self, fd: int): + self._fd_holder = _FdHolder(fd) + self._conflict_detector = ConflictDetector( + "another task is using this pipe" + ) + async def receive_some(self, max_bytes: int) -> bytes: - if self._closed: - await _core.checkpoint() - raise _core.ClosedResourceError("this pipe is already closed") + async with self._conflict_detector: + if not isinstance(max_bytes, int): + raise TypeError("max_bytes must be integer >= 1") + + if max_bytes < 1: + raise ValueError("max_bytes must be integer >= 1") + + while True: + try: + data = os.read(self._fd_holder.fd, max_bytes) + except BlockingIOError: + await _core.wait_readable(self._fd_holder.fd) + except OSError as e: + await _core.cancel_shielded_checkpoint() + if e.errno == errno.EBADF: + raise _core.ClosedResourceError( + "this pipe was closed" + ) from None + else: + raise _core.BrokenResourceError from e + else: + break - if not isinstance(max_bytes, int): - await _core.checkpoint() - raise TypeError("max_bytes must be integer >= 1") + return data - if max_bytes < 1: - await _core.checkpoint() - raise ValueError("max_bytes must be integer >= 1") + async def aclose(self): + await self._fd_holder.aclose() - while True: - try: - await _core.checkpoint_if_cancelled() - data = os.read(self._pipe, max_bytes) - except BlockingIOError: - await _core.wait_readable(self._pipe) - else: - await _core.cancel_shielded_checkpoint() - break - - return data + def fileno(self): + return self._fd_holder.fd async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]: diff --git a/trio/tests/test_unix_pipes.py b/trio/tests/test_unix_pipes.py index 2e8815ca06..45b62d4098 100644 --- a/trio/tests/test_unix_pipes.py +++ b/trio/tests/test_unix_pipes.py @@ -16,26 +16,22 @@ async def test_send_pipe(): r, w = os.pipe() - send = PipeSendStream(w) - assert send.fileno() == w - await send.send_all(b"123") - assert (os.read(r, 8)) == b"123" + async with PipeSendStream(w) as send: + assert send.fileno() == w + await send.send_all(b"123") + assert (os.read(r, 8)) == b"123" - os.close(r) - os.close(w) - send._closed = True + os.close(r) async def test_receive_pipe(): r, w = os.pipe() - recv = PipeReceiveStream(r) - assert (recv.fileno()) == r - os.write(w, b"123") - assert (await recv.receive_some(8)) == b"123" + async with PipeReceiveStream(r) as recv: + assert (recv.fileno()) == r + os.write(w, b"123") + assert (await recv.receive_some(8)) == b"123" - os.close(r) - os.close(w) - recv._closed = True + os.close(w) async def test_pipes_combined(): @@ -90,8 +86,8 @@ async def test_async_with(): async with w, r: pass - assert w._closed - assert r._closed + assert w.fileno() == -1 + assert r.fileno() == -1 with pytest.raises(OSError) as excinfo: os.close(w.fileno())