Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Windows support #17

Merged
merged 12 commits into from
Apr 3, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added cross-platform process tracking support
  • Loading branch information
ntninja committed Feb 12, 2017
commit 560586407740826edc9fa0eb8149ac094648f4ec
73 changes: 58 additions & 15 deletions gbulb/glib_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,45 +28,67 @@ class AbstractChildWatcher:
class GLibChildWatcher(AbstractChildWatcher):
def __init__(self):
self._sources = {}
self._handles = {}

# On windows on has to open a process handle for the given PID number
# before it's possible to use GLib's `child_watch_add` on it
if sys.platform == "win32":
def _create_handle_for_pid(self, pid):
import _winapi
return _winapi.OpenProcess(0x00100400, 0, pid)
def _close_process_handle(self, handle):
import _winapi
_winapi.CloseHandle(handle)
else:
_create_handle_for_pid = lambda self, pid: pid
_close_process_handle = lambda self, pid: None

def attach_loop(self, loop):
# just ignored
pass

def add_child_handler(self, pid, callback, *args):
self.remove_child_handler(pid)

source = GLib.child_watch_add(0, pid, self.__callback__)
self._sources[pid] = source, callback, args

handle = self._create_handle_for_pid(pid)
source = GLib.child_watch_add(0, handle, self.__callback__)
self._sources[pid] = source, callback, args, handle
self._handles[handle] = pid

def remove_child_handler(self, pid):
try:
source = self._sources.pop(pid)[0]
source, callback, args, handle = self._sources.pop(pid)
assert self._handles.pop(handle) == pid
except KeyError:
return False


self._close_process_handle(handle)
GLib.source_remove(source)
return True

def close(self):
for source, callback, args in self._sources.values():
for source, callback, args, handle in self._sources.values():
self._close_process_handle(handle)
GLib.source_remove(source)

self._sources = {}
self._handles = {}

def __enter__(self):
return self

def __exit__(self, a, b, c):
pass

def __callback__(self, pid, status):


def __callback__(self, handle, status):
try:
source, callback, args = self._sources.pop(pid)
pid = self._handles.pop(handle)
source, callback, args, handle = self._sources.pop(pid)
except KeyError:
return


self._close_process_handle(handle)
GLib.source_remove(source)

if hasattr(os, "WIFSIGNALED") and os.WIFSIGNALED(status):
returncode = -os.WTERMSIG(status)
elif hasattr(os, "WIFEXITED") and os.WIFEXITED(status):
Expand Down Expand Up @@ -278,7 +300,28 @@ def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
"""Create subprocess transport."""
raise NotImplementedError
with events.get_child_watcher() as watcher:
waiter = self.create_future()
transport = transports.SubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
waiter=waiter, extra=extra, **kwargs)

watcher.add_child_handler(transport.get_pid(), self._child_watcher_callback, transport)
try:
yield from waiter
except Exception as exc:
err = exc
else:
err = None
if err is not None:
transport.close()
yield from transport._wait()
raise err

return transport

def _child_watcher_callback(self, pid, returncode, transport):
self.call_soon_threadsafe(transport._process_exited, returncode)

def _write_to_self(self):
self._context.wakeup()
Expand Down
10 changes: 9 additions & 1 deletion gbulb/transports.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import collections
import socket
from asyncio import futures, transports
import subprocess
from asyncio import base_subprocess, futures, transports



Expand Down Expand Up @@ -404,3 +405,10 @@ def _force_close_async(self, exc):
super()._force_close_async(exc)
finally:
self._channel.shutdown(True)


class SubprocessTransport(base_subprocess.BaseSubprocessTransport):
def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
self._proc = subprocess.Popen(
args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
bufsize=bufsize, **kwargs)