diff --git a/trio/_worker_processes.py b/trio/_worker_processes.py index fb8bf1f817..5e9c22f853 100644 --- a/trio/_worker_processes.py +++ b/trio/_worker_processes.py @@ -1,7 +1,9 @@ import os +import struct from collections import deque from itertools import count from multiprocessing import Pipe, Process, Barrier +from multiprocessing.reduction import ForkingPickler from threading import BrokenBarrierError import trio @@ -35,9 +37,13 @@ def current_default_process_limiter(): if os.name == "nt": + from trio._windows_pipes import PipeSendChannel, PipeReceiveChannel + # TODO: This uses a thread per-process. Can we do better? wait_sentinel = trio.lowlevel.WaitForSingleObject else: + from trio.lowlevel import FdStream + wait_sentinel = trio.lowlevel.wait_readable @@ -144,21 +150,18 @@ def worker_fn(): async def run_sync(self, sync_fn, *args): # Neither this nor the child process should be waiting at this point assert not self._barrier.n_waiting, "Must first wake_up() the WorkerProc" + self._rehabilitate_pipes() async with trio.open_nursery() as nursery: try: await nursery.start(self._child_monitor) - await trio.to_thread.run_sync( - self._send_pipe.send, (sync_fn, args), cancellable=True - ) - result = await trio.to_thread.run_sync( - self._recv_pipe.recv, cancellable=True - ) + await self._send(ForkingPickler.dumps((sync_fn, args))) + result = ForkingPickler.loads(await self._recv()) except trio.Cancelled: # Cancellation leaves the process in an unknown state so # there is no choice but to kill, anyway it frees the pipe threads self.kill() raise - except EOFError: + except trio.EndOfChannel: # Likely the worker died while we were waiting on a pipe self.kill() # sleep and let the monitor raise the appropriate error @@ -193,6 +196,93 @@ def kill(self): except AttributeError: self._proc.terminate() + if os.name == "nt": + + def _rehabilitate_pipes(self): + # These must be created in an async context, so defer so + # that this object can be instantiated in e.g. a thread + if not hasattr(self, "_send_chan"): + self._send_chan = PipeSendChannel(self._send_pipe.fileno()) + self._recv_chan = PipeReceiveChannel(self._recv_pipe.fileno()) + + async def _recv(self): + try: + return await self._recv_chan.receive() + except trio.ClosedResourceError as e: + if "process" not in str(e): + raise + # worker probably died but the channel iocp fired first + await trio.sleep_forever() # wait for monitor to see it + + async def _send(self, buf): + await self._send_chan.send(buf) + + def __del__(self): + # Avoid __del__ errors on cleanup: GH#174, GH#1767 + # multiprocessing will close them for us + if hasattr(self, "_send_chan"): + self._send_chan._handle_holder.handle = -1 + self._recv_chan._handle_holder.handle = -1 + + else: + + def _rehabilitate_pipes(self): + # These must be created in an async context, so defer so + # that this object can be instantiated in e.g. a thread + if not hasattr(self, "_send_stream"): + self._send_stream = FdStream(self._send_pipe.fileno()) + self._recv_stream = FdStream(self._recv_pipe.fileno()) + + async def _recv(self): + buf = await self._recv_stream.receive_some(4) + (size,) = struct.unpack("!i", buf) + if size == -1: + buf = self._recv_stream.receive_some(8) + (size,) = struct.unpack("!Q", buf) + result_bytes = bytearray() + async for partial_result in self._recv_stream: + result_bytes.extend(partial_result) + if len(result_bytes) == size: + break + elif len(result_bytes) > size: + raise RuntimeError("Oversized response") + # else: + # # worker probably died but the stream closed first + # await trio.sleep_forever() # wait for monitor to see it + if len(result_bytes) < size: + raise OSError("got end of file during message") + return result_bytes + + async def _send(self, buf): + n = len(buf) + if n > 0x7FFFFFFF: + pre_header = struct.pack("!i", -1) + header = struct.pack("!Q", n) + await self._send_stream.send(pre_header) + await self._send_stream.send(header) + await self._send_stream.send(buf) + else: + # For wire compatibility with 3.7 and lower + header = struct.pack("!i", n) + if n > 16384: + # The payload is large so Nagle's algorithm won't be triggered + # and we'd better avoid the cost of concatenation. + await self._send_stream.send(header) + await self._send_stream.send(buf) + else: + # Issue #20540: concatenate before sending, to avoid delays due + # to Nagle's algorithm on a TCP socket. + # Also note we want to avoid sending a 0-length buffer separately, + # to avoid "broken pipe" errors if the other end closed the pipe. + await self._send_stream.send(header + buf) + + def __del__(self): + # Avoid __del__ errors on cleanup: GH#174, GH#1767 + # multiprocessing will close them for us + if hasattr(self, "_send_stream"): + self._send_stream._fd_holder.fd = -1 + self._recv_stream._fd_holder.fd = -1 + async def to_process_run_sync(sync_fn, *args, cancellable=False, limiter=None): """Run sync_fn in a separate process