Skip to content

Commit

Permalink
Convert workers to use PipeSendChannels or FdStreams
Browse files Browse the repository at this point in the history
  • Loading branch information
richardsheridan committed Nov 17, 2020
1 parent b3b5bae commit 7f637d2
Showing 1 changed file with 97 additions and 7 deletions.
104 changes: 97 additions & 7 deletions trio/_worker_processes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
import struct
from collections import deque
from itertools import count
from multiprocessing import Pipe, Process, Barrier
from multiprocessing.reduction import ForkingPickler
from threading import BrokenBarrierError

import trio
Expand Down Expand Up @@ -35,9 +37,13 @@ def current_default_process_limiter():


if os.name == "nt":
from trio._windows_pipes import PipeSendChannel, PipeReceiveChannel

# TODO: This uses a thread per-process. Can we do better?
wait_sentinel = trio.lowlevel.WaitForSingleObject
else:
from trio.lowlevel import FdStream

wait_sentinel = trio.lowlevel.wait_readable


Expand Down Expand Up @@ -144,21 +150,18 @@ def worker_fn():
async def run_sync(self, sync_fn, *args):
# Neither this nor the child process should be waiting at this point
assert not self._barrier.n_waiting, "Must first wake_up() the WorkerProc"
self._rehabilitate_pipes()
async with trio.open_nursery() as nursery:
try:
await nursery.start(self._child_monitor)
await trio.to_thread.run_sync(
self._send_pipe.send, (sync_fn, args), cancellable=True
)
result = await trio.to_thread.run_sync(
self._recv_pipe.recv, cancellable=True
)
await self._send(ForkingPickler.dumps((sync_fn, args)))
result = ForkingPickler.loads(await self._recv())
except trio.Cancelled:
# Cancellation leaves the process in an unknown state so
# there is no choice but to kill, anyway it frees the pipe threads
self.kill()
raise
except EOFError:
except trio.EndOfChannel:
# Likely the worker died while we were waiting on a pipe
self.kill()
# sleep and let the monitor raise the appropriate error
Expand Down Expand Up @@ -193,6 +196,93 @@ def kill(self):
except AttributeError:
self._proc.terminate()

if os.name == "nt":

def _rehabilitate_pipes(self):
# These must be created in an async context, so defer so
# that this object can be instantiated in e.g. a thread
if not hasattr(self, "_send_chan"):
self._send_chan = PipeSendChannel(self._send_pipe.fileno())
self._recv_chan = PipeReceiveChannel(self._recv_pipe.fileno())

async def _recv(self):
try:
return await self._recv_chan.receive()
except trio.ClosedResourceError as e:
if "process" not in str(e):
raise
# worker probably died but the channel iocp fired first
await trio.sleep_forever() # wait for monitor to see it

async def _send(self, buf):
await self._send_chan.send(buf)

def __del__(self):
# Avoid __del__ errors on cleanup: GH#174, GH#1767
# multiprocessing will close them for us
if hasattr(self, "_send_chan"):
self._send_chan._handle_holder.handle = -1
self._recv_chan._handle_holder.handle = -1

else:

def _rehabilitate_pipes(self):
# These must be created in an async context, so defer so
# that this object can be instantiated in e.g. a thread
if not hasattr(self, "_send_stream"):
self._send_stream = FdStream(self._send_pipe.fileno())
self._recv_stream = FdStream(self._recv_pipe.fileno())

async def _recv(self):
buf = await self._recv_stream.receive_some(4)
(size,) = struct.unpack("!i", buf)
if size == -1:
buf = self._recv_stream.receive_some(8)
(size,) = struct.unpack("!Q", buf)
result_bytes = bytearray()
async for partial_result in self._recv_stream:
result_bytes.extend(partial_result)
if len(result_bytes) == size:
break
elif len(result_bytes) > size:
raise RuntimeError("Oversized response")
# else:
# # worker probably died but the stream closed first
# await trio.sleep_forever() # wait for monitor to see it
if len(result_bytes) < size:
raise OSError("got end of file during message")
return result_bytes

async def _send(self, buf):
n = len(buf)
if n > 0x7FFFFFFF:
pre_header = struct.pack("!i", -1)
header = struct.pack("!Q", n)
await self._send_stream.send(pre_header)
await self._send_stream.send(header)
await self._send_stream.send(buf)
else:
# For wire compatibility with 3.7 and lower
header = struct.pack("!i", n)
if n > 16384:
# The payload is large so Nagle's algorithm won't be triggered
# and we'd better avoid the cost of concatenation.
await self._send_stream.send(header)
await self._send_stream.send(buf)
else:
# Issue #20540: concatenate before sending, to avoid delays due
# to Nagle's algorithm on a TCP socket.
# Also note we want to avoid sending a 0-length buffer separately,
# to avoid "broken pipe" errors if the other end closed the pipe.
await self._send_stream.send(header + buf)

def __del__(self):
# Avoid __del__ errors on cleanup: GH#174, GH#1767
# multiprocessing will close them for us
if hasattr(self, "_send_stream"):
self._send_stream._fd_holder.fd = -1
self._recv_stream._fd_holder.fd = -1


async def to_process_run_sync(sync_fn, *args, cancellable=False, limiter=None):
"""Run sync_fn in a separate process
Expand Down

0 comments on commit 7f637d2

Please sign in to comment.