Skip to content

Commit

Permalink
Merge pull request #791 from oremanj/subprocess
Browse files Browse the repository at this point in the history
Add subprocess support
  • Loading branch information
njsmith authored Dec 27, 2018
2 parents 4a66c64 + 8ec3a6c commit c212ce8
Show file tree
Hide file tree
Showing 23 changed files with 1,672 additions and 225 deletions.
170 changes: 163 additions & 7 deletions docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ create complex transport configurations. Here's some examples:
speak SSL over the network is to wrap an
:class:`~trio.ssl.SSLStream` around a :class:`~trio.SocketStream`.

* If you spawn a subprocess then you can get a
* If you spawn a :ref:`subprocess`, you can get a
:class:`~trio.abc.SendStream` that lets you write to its stdin, and
a :class:`~trio.abc.ReceiveStream` that lets you read from its
stdout. If for some reason you wanted to speak SSL to a subprocess,
Expand All @@ -36,9 +36,6 @@ create complex transport configurations. Here's some examples:
ssl_context.check_hostname = False
s = SSLStream(StapledStream(process.stdin, process.stdout), ssl_context)

[Note: subprocess support is not implemented yet, but that's the
plan. Unless it is implemented, and I forgot to remove this note.]

* It sometimes happens that you want to connect to an HTTPS server,
but you have to go through a web proxy... and the proxy also uses
HTTPS. So you end up having to do `SSL-on-top-of-SSL
Expand Down Expand Up @@ -641,10 +638,169 @@ Asynchronous file objects
The underlying synchronous file object.


Subprocesses
------------
.. module:: trio.subprocess
.. _subprocess:

Spawning subprocesses with :mod:`trio.subprocess`
-------------------------------------------------

The :mod:`trio.subprocess` module provides support for spawning
other programs, communicating with them via pipes, sending them signals,
and waiting for them to exit. Its interface is based on the
:mod:`subprocess` module in the standard library; differences
are noted below.

The constants and exceptions from the standard :mod:`subprocess`
module are re-exported by :mod:`trio.subprocess` unchanged.
So, if you like, you can say ``from trio import subprocess``
and continue referring to ``subprocess.PIPE``,
:exc:`subprocess.CalledProcessError`, and so on, in the same
way you would in synchronous code.


.. _subprocess-options:

Options for starting subprocesses
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The standard :mod:`subprocess` module supports a dizzying array
of `options <https://docs.python.org/3/library/subprocess.html#popen-constructor>`__
for controlling the environment in which a process starts and the
mechanisms used for communicating with it. (If you find that list
overwhelming, you're not alone; you might prefer to start with
just the `frequently used ones
<https://docs.python.org/3/library/subprocess.html#frequently-used-arguments>`__.)

Trio makes use of the :mod:`subprocess` module's logic for spawning processes,
so almost all of these options can be used with their same semantics when
starting subprocesses under Trio. The exceptions are ``encoding``, ``errors``,
``universal_newlines`` (and its 3.7+ alias ``text``), and ``bufsize``;
Trio always uses unbuffered byte streams for communicating with a process,
so these options don't make sense. Text I/O should use a layer
on top of the raw byte streams, just as it does with sockets.
[This layer does not yet exist, but is in the works.]


Running a process and waiting for it to finish
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

We're `working on <https://github.com/python-trio/trio/pull/791>`
figuring out the best API for common higher-level subprocess operations.
In the meantime, you can implement something like the standard library
:func:`subprocess.run` in terms of :class:`trio.subprocess.Process`
as follows::

async def run(
command, *, input=None, capture_output=False, **options
):
if input is not None:
options['stdin'] = subprocess.PIPE
if capture_output:
options['stdout'] = options['stderr'] = subprocess.PIPE

stdout_chunks = []
stderr_chunks = []

async with trio.subprocess.Process(command, **options) as proc:

async def feed_input():
async with proc.stdin:
if input:
try:
await proc.stdin.send_all(input)
except trio.BrokenResourceError:
pass

async def read_output(stream, chunks):
async with stream:
while True:
chunk = await stream.receive_some(32768)
if not chunk:
break
chunks.append(chunk)

async with trio.open_nursery() as nursery:
if proc.stdin is not None:
nursery.start_soon(feed_input)
if proc.stdout is not None:
nursery.start_soon(read_output, proc.stdout, stdout_chunks)
if proc.stderr is not None:
nursery.start_soon(read_output, proc.stderr, stderr_chunks)
await proc.wait()

stdout = b"".join(stdout_chunks) if proc.stdout is not None else None
stderr = b"".join(stderr_chunks) if proc.stderr is not None else None

if proc.returncode:
raise subprocess.CalledProcessError(
proc.returncode, proc.args, output=stdout, stderr=stderr
)
else:
return subprocess.CompletedProcess(
proc.args, proc.returncode, stdout, stderr
)


Interacting with a process as it runs
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You can spawn a subprocess by creating an instance of
:class:`trio.subprocess.Process` and then interact with it using its
:attr:`~trio.subprocess.Process.stdin`,
:attr:`~trio.subprocess.Process.stdout`, and/or
:attr:`~trio.subprocess.Process.stderr` streams.

.. autoclass:: trio.subprocess.Process
:members:


`Not implemented yet! <https://github.com/python-trio/trio/issues/4>`__
Differences from the standard library
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

* All arguments to the constructor of
:class:`~trio.subprocess.Process`, except the command to run, must be
passed using keywords.

* :func:`~subprocess.call`, :func:`~subprocess.check_call`, and
:func:`~subprocess.check_output` are not provided.

* :meth:`~subprocess.Popen.communicate` is not provided as a method on
:class:`~trio.subprocess.Process` objects; use a higher-level
function instead, or write the loop yourself if
you have unusual needs. :meth:`~subprocess.Popen.communicate` has
quite unusual cancellation behavior in the standard library (on some
platforms it spawns a background thread which continues to read from
the child process even after the timeout has expired) and we wanted
to provide an interface with fewer surprises.

* :meth:`~trio.subprocess.Process.wait` is an async function that does
not take a ``timeout`` argument; combine it with
:func:`~trio.fail_after` if you want a timeout.

* Text I/O is not supported: you may not use the
:class:`~trio.subprocess.Process` constructor arguments
``universal_newlines`` (or its 3.7+ alias ``text``), ``encoding``,
or ``errors``.

* :attr:`~trio.subprocess.Process.stdin` is a :class:`~trio.abc.SendStream` and
:attr:`~trio.subprocess.Process.stdout` and :attr:`~trio.subprocess.Process.stderr`
are :class:`~trio.abc.ReceiveStream`\s, rather than file objects. The
:class:`~trio.subprocess.Process` constructor argument ``bufsize`` is
not supported since there would be no file object to pass it to.

* :meth:`~trio.subprocess.Process.aclose` (and thus also
``__aexit__``) behave like the standard :class:`~subprocess.Popen`
context manager exit (close pipes to the process, then wait for it
to exit), but add additional behavior if cancelled: kill the process
and wait for it to finish terminating. This is useful for scoping
the lifetime of a simple subprocess that doesn't spawn any children
of its own. (For subprocesses that do in turn spawn their own
subprocesses, there is not currently any way to clean up the whole
tree; moreover, using the :class:`Process` context manager in such
cases is likely to be counterproductive as killing the top-level
subprocess leaves it no chance to do any cleanup of its children
that might be desired. You'll probably want to write your own
supervision logic in that case.)


Signals
Expand Down
6 changes: 6 additions & 0 deletions newsfragments/4.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Initial :ref:`subprocess support <subprocess>`.
Add :class:`trio.subprocess.Process`, an async wrapper around the stdlib
:class:`subprocess.Popen` class, which permits spawning subprocesses
and communicating with them over standard Trio streams.
:mod:`trio.subprocess` also reexports all the stdlib :mod:`subprocess`
exceptions and constants for convenience.
73 changes: 73 additions & 0 deletions notes-to-self/subprocess-notes.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# subprocesses are a huge hassle
# on Linux there is simply no way to async wait for a child to exit except by
# messing with SIGCHLD and that is ... *such* a mess. Not really
# tenable. We're better off trying os.waitpid(..., os.WNOHANG), and if that
# says the process is still going then spawn a thread to sit in waitpid.
# ......though that waitpid is non-cancellable so ugh. this is a problem,
# becaues it's also mutating -- you only get to waitpid() once, and you have
# to do it, because zombies. I guess we could make sure the waitpid thread is
# daemonic and either it gets back to us eventually (even if our first call to
# 'await wait()' is cancelled, maybe another one won't be), or else we go away
# and don't care anymore.
# I guess simplest is just to spawn a thread at the same time as we spawn the
# process, with more reasonable notification semantics.
# or we can poll every 100 ms or something, sigh.

# on Mac/*BSD then kqueue works, go them. (maybe have WNOHANG after turning it
# on to avoid a race condition I guess)

# on Windows, you can either do the thread thing, or something involving
# WaitForMultipleObjects, or the Job Object API:
# https://stackoverflow.com/questions/17724859/detecting-exit-failure-of-child-processes-using-iocp-c-windows
# (see also the comments here about using the Job Object API:
# https://stackoverflow.com/questions/23434842/python-how-to-kill-child-processes-when-parent-dies/23587108#23587108)
# however the docs say:
# "Note that, with the exception of limits set with the
# JobObjectNotificationLimitInformation information class, delivery of
# messages to the completion port is not guaranteed; failure of a message to
# arrive does not necessarily mean that the event did not occur"
#
# oh windows wtf

# We'll probably want to mess with the job API anyway for worker processes
# (b/c that's the reliable way to make sure we never leave residual worker
# processes around after exiting, see that stackoverflow question again), so
# maybe this isn't too big a hassle? waitpid is probably easiest for the
# first-pass implementation though.

# the handle version has the same issues as waitpid on Linux, except I guess
# that on windows the waitpid equivalent doesn't consume the handle.
# -- wait no, the windows equivalent takes a timeout! and we know our
# cancellation deadline going in, so that's actually okay. (Still need to use
# a thread but whatever.)

# asyncio does RegisterWaitForSingleObject with a callback that does
# PostQueuedCompletionStatus.
# this is just a thread pool in disguise (and in principle could have weird
# problems if you have enough children and run out of threads)
# it's possible we could do something with a thread that just sits in
# an alertable state and handle callbacks...? though hmm, maybe the set of
# events that can notify via callbacks is equivalent to the set that can
# notify via IOCP.
# there's WaitForMultipleObjects to let multiple waits share a thread I
# guess.
# you can wake up a WaitForMultipleObjectsEx on-demand by using QueueUserAPC
# to send a no-op APC to its thread.
# this is also a way to cancel a WaitForSingleObjectEx, actually. So it
# actually is possible to cancel the equivalent of a waitpid on Windows.

# Potentially useful observation: you *can* use a socket as the
# stdin/stdout/stderr for a child, iff you create that socket *without*
# WSA_FLAG_OVERLAPPED:
# http://stackoverflow.com/a/5725609
# Here's ncm's Windows implementation of socketpair, which has a flag to
# control whether one of the sockets has WSA_FLAG_OVERLAPPED set:
# https://github.com/ncm/selectable-socketpair/blob/master/socketpair.c
# (it also uses listen(1) so it's robust against someone intercepting things,
# unlike the version in socket.py... not sure anyone really cares, but
# hey. OTOH it only supports AF_INET, while socket.py supports AF_INET6,
# fancy.)
# (or it would be trivial to (re)implement in python, using either
# socket.socketpair or ncm's version as a model, given a cffi function to
# create the non-overlapped socket in the first place then just pass it into
# the socket.socket constructor (avoiding the dup() that fromfd does).)
1 change: 1 addition & 0 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from . import socket
from . import abc
from . import ssl
from . import subprocess
# Not imported by default: testing
if False:
from . import testing
Expand Down
Loading

0 comments on commit c212ce8

Please sign in to comment.