Skip to content

Commit

Permalink
Merge pull request #263 from dw/dmw
Browse files Browse the repository at this point in the history
Stray mux process on CTRL+C, EINTR on async task timeout, temp dir cleanup race
  • Loading branch information
dw authored Jun 10, 2018
2 parents 3a2f422 + 9617f4d commit 876a82f
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 114 deletions.
1 change: 0 additions & 1 deletion ansible_mitogen/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
from ansible.plugins import module_loader
from ansible.plugins import module_utils_loader

import mitogen
import ansible_mitogen.target


Expand Down
25 changes: 18 additions & 7 deletions ansible_mitogen/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import errno
import logging
import os
import signal
import socket
import sys

Expand Down Expand Up @@ -110,7 +111,7 @@ def start(cls):
if cls.child_pid:
cls.child_sock.close()
cls.child_sock = None
cls.worker_sock.recv(1)
mitogen.core.io_op(cls.worker_sock.recv, 1)
else:
cls.worker_sock.close()
cls.worker_sock = None
Expand All @@ -128,9 +129,9 @@ def worker_main(self):
self._setup_services()

# Let the parent know our listening socket is ready.
self.child_sock.send('1')
mitogen.core.io_op(self.child_sock.send, '1')
# Block until the socket is closed, which happens on parent exit.
self.child_sock.recv(1)
mitogen.core.io_op(self.child_sock.recv, 1)

def _setup_master(self):
"""
Expand All @@ -140,6 +141,7 @@ def _setup_master(self):
self.router.responder.whitelist_prefix('ansible')
self.router.responder.whitelist_prefix('ansible_mitogen')
mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown)
mitogen.core.listen(self.router.broker, 'exit', self.on_broker_exit)
self.listener = mitogen.unix.Listener(
router=self.router,
path=self.unix_listener_path,
Expand Down Expand Up @@ -168,14 +170,23 @@ def _setup_services(self):

def on_broker_shutdown(self):
"""
Respond to the Router shutdown (indirectly triggered through exit of
the main thread) by unlinking the listening socket. Ideally this would
happen explicitly, but Ansible provides no hook to allow it.
Respond to broker shutdown by beginning service pool shutdown. Do not
join on the pool yet, since that would block the broker thread which
then cannot clean up pending handlers, which is required for the
threads to exit gracefully.
"""
self.pool.stop()
self.pool.stop(join=False)
try:
os.unlink(self.listener.path)
except OSError, e:
# Prevent a shutdown race with the parent process.
if e.args[0] != errno.ENOENT:
raise

def on_broker_exit(self):
"""
Respond to the broker thread about to exit by sending SIGTERM to
ourself. In future this should gracefully join the pool, but TERM is
fine for now.
"""
os.kill(os.getpid(), signal.SIGTERM)
17 changes: 16 additions & 1 deletion ansible_mitogen/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from __future__ import absolute_import
import cStringIO
import ctypes
import errno
import imp
import json
import logging
Expand Down Expand Up @@ -148,7 +149,7 @@ def revert(self):
implementation simply restores the original environment.
"""
self._env.revert()
self._cleanup_temp()
self._try_cleanup_temp()

def _cleanup_temp(self):
"""
Expand All @@ -162,6 +163,20 @@ def _cleanup_temp(self):
LOG.debug('Deleting %r', path)
ansible_mitogen.target.prune_tree(path)

def _try_cleanup_temp(self):
"""
During broker shutdown triggered by async task timeout or loss of
connection to the parent, it is possible for prune_tree() in
target.py::_on_broker_shutdown() to run before _cleanup_temp(), so skip
cleanup if the directory or a file disappears from beneath us.
"""
try:
self._cleanup_temp()
except (IOError, OSError) as e:
if e.args[0] == errno.ENOENT:
return
raise

def _run(self):
"""
The _run() method is expected to return a dictionary in the form of
Expand Down
203 changes: 115 additions & 88 deletions ansible_mitogen/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,110 +266,137 @@ def _get_async_dir():
)


def _write_job_status(job_id, dct):
"""
Update an async job status file.
"""
LOG.info('_write_job_status(%r, %r)', job_id, dct)
dct.setdefault('ansible_job_id', job_id)
dct.setdefault('data', '')

async_dir = _get_async_dir()
if not os.path.exists(async_dir):
os.makedirs(async_dir)

path = os.path.join(async_dir, job_id)
with open(path + '.tmp', 'w') as fp:
fp.write(json.dumps(dct))
os.rename(path + '.tmp', path)


def _sigalrm(broker, timeout_secs, job_id):
"""
Respond to SIGALRM (job timeout) by updating the job file and killing the
process.
"""
msg = "Job reached maximum time limit of %d seconds." % (timeout_secs,)
_write_job_status(job_id, {
"failed": 1,
"finished": 1,
"msg": msg,
})
broker.shutdown()


def _install_alarm(broker, timeout_secs, job_id):
handler = lambda *_: _sigalrm(broker, timeout_secs, job_id)
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_secs)


def _run_module_async(kwargs, job_id, timeout_secs, econtext):
"""
1. Immediately updates the status file to mark the job as started.
2. Installs a timer/signal handler to implement the time limit.
3. Runs as with run_module(), writing the result to the status file.
:param dict kwargs:
Runner keyword arguments.
:param str job_id:
String job ID.
:param int timeout_secs:
If >0, limit the task's maximum run time.
"""
_write_job_status(job_id, {
'started': 1,
'finished': 0,
'pid': os.getpid()
})

if timeout_secs > 0:
_install_alarm(econtext.broker, timeout_secs, job_id)
class AsyncRunner(object):
def __init__(self, job_id, timeout_secs, econtext, kwargs):
self.job_id = job_id
self.timeout_secs = timeout_secs
self.econtext = econtext
self.kwargs = kwargs
self._timed_out = False
self._init_path()

def _init_path(self):
async_dir = _get_async_dir()
if not os.path.exists(async_dir):
os.makedirs(async_dir)
self.path = os.path.join(async_dir, self.job_id)

def _update(self, dct):
"""
Update an async job status file.
"""
LOG.info('%r._update(%r, %r)', self, self.job_id, dct)
dct.setdefault('ansible_job_id', self.job_id)
dct.setdefault('data', '')

with open(self.path + '.tmp', 'w') as fp:
fp.write(json.dumps(dct))
os.rename(self.path + '.tmp', self.path)

def _on_sigalrm(self, signum, frame):
"""
Respond to SIGALRM (job timeout) by updating the job file and killing
the process.
"""
msg = "Job reached maximum time limit of %d seconds." % (
self.timeout_secs,
)
self._update({
"failed": 1,
"finished": 1,
"msg": msg,
})
self._timed_out = True
self.econtext.broker.shutdown()

def _install_alarm(self):
signal.signal(signal.SIGALRM, self._on_sigalrm)
signal.alarm(self.timeout_secs)

def _run_module(self):
kwargs = dict(self.kwargs, **{
'detach': True,
'econtext': self.econtext,
'emulate_tty': False,
})

kwargs['detach'] = True
kwargs['econtext'] = econtext
kwargs['emulate_tty'] = False
dct = run_module(kwargs)
if mitogen.core.PY3:
for key in 'stdout', 'stderr':
dct[key] = dct[key].decode('utf-8', 'surrogateescape')
dct = run_module(kwargs)
if mitogen.core.PY3:
for key in 'stdout', 'stderr':
dct[key] = dct[key].decode('utf-8', 'surrogateescape')
return dct

try:
def _parse_result(self, dct):
filtered, warnings = (
ansible.module_utils.json_utils.
_filter_non_json_lines(dct['stdout'])
)
result = json.loads(filtered)
result.setdefault('warnings', []).extend(warnings)
result['stderr'] = dct['stderr']
_write_job_status(job_id, result)
except Exception:
_write_job_status(job_id, {
"failed": 1,
"msg": traceback.format_exc(),
"data": dct['stdout'], # temporary notice only
"stderr": dct['stderr']
self._update(result)

def _run(self):
"""
1. Immediately updates the status file to mark the job as started.
2. Installs a timer/signal handler to implement the time limit.
3. Runs as with run_module(), writing the result to the status file.
:param dict kwargs:
Runner keyword arguments.
:param str job_id:
String job ID.
:param int timeout_secs:
If >0, limit the task's maximum run time.
"""
self._update({
'started': 1,
'finished': 0,
'pid': os.getpid()
})

if self.timeout_secs > 0:
self._install_alarm()

dct = self._run_module()
if not self._timed_out:
# After SIGALRM fires, there is a window between broker responding
# to shutdown() by killing the process, and work continuing on the
# main thread. If main thread was asleep in at least
# basic.py/select.select(), an EINTR will be raised. We want to
# discard that exception.
try:
self._parse_result(dct)
except Exception:
self._update({
"failed": 1,
"msg": traceback.format_exc(),
"data": dct['stdout'], # temporary notice only
"stderr": dct['stderr']
})

def run(self):
try:
try:
self._run()
except Exception:
self._update({
"failed": 1,
"msg": traceback.format_exc(),
})
finally:
self.econtext.broker.shutdown()


@mitogen.core.takes_econtext
def run_module_async(kwargs, job_id, timeout_secs, econtext):
"""
Arrange for a module to be executed with its run status and result
serialized to a disk file. This function expects to run in a child forked
using :func:`create_fork_child`.
Execute a module with its run status and result written to a file,
terminating on the process on completion. This function must run in a child
forked using :func:`create_fork_child`.
"""
try:
try:
_run_module_async(kwargs, job_id, timeout_secs, econtext)
except Exception:
# Catch any (ansible_mitogen) bugs and write them to the job file.
_write_job_status(job_id, {
"failed": 1,
"msg": traceback.format_exc(),
})
finally:
econtext.broker.shutdown()
arunner = AsyncRunner(job_id, timeout_secs, econtext, kwargs)
arunner.run()


def make_temp_directory(base_dir):
Expand Down
9 changes: 9 additions & 0 deletions mitogen/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@
_last = None


def enable_evil_interrupts():
signal.signal(signal.SIGALRM, (lambda a, b: None))
signal.setitimer(signal.ITIMER_REAL, 0.01, 0.01)


def disable_evil_interrupts():
signal.setitimer(signal.ITIMER_REAL, 0, 0)


def _hex(n):
return '%08x' % n

Expand Down
13 changes: 10 additions & 3 deletions mitogen/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,12 @@ def _run(self):
method_name, kwargs, msg = tup
try:
super(SerializedInvoker, self).invoke(method_name, kwargs, msg)
except mitogen.core.CallError:
e = sys.exc_info()[1]
LOG.warning('%r: call error: %s: %s', self, msg, e)
msg.reply(e)
except Exception:
LOG.exception('%r: while invoking %r of %r',
self, method_name, self.service)
LOG.exception('%r: while invoking %s()', self, method_name)
msg.reply(mitogen.core.Message.dead())

def invoke(self, method_name, kwargs, msg):
Expand Down Expand Up @@ -456,9 +459,13 @@ def add(self, service):

closed = False

def stop(self):
def stop(self, join=True):
self.closed = True
self._select.close()
if join:
self.join()

def join(self):
for th in self._threads:
th.join()
for invoker in self._invoker_by_name.itervalues():
Expand Down
1 change: 1 addition & 0 deletions tests/ansible/ansible.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ gathering = explicit
strategy_plugins = ../../ansible_mitogen/plugins/strategy
action_plugins = lib/action
callback_plugins = lib/callback
stdout_callback = nice_stdout
library = lib/modules
# module_utils = lib/module_utils
retry_files_enabled = False
Expand Down
Loading

0 comments on commit 876a82f

Please sign in to comment.