diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 8ea3886ab..801950f9a 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -51,7 +51,6 @@ from ansible.plugins import module_loader from ansible.plugins import module_utils_loader -import mitogen import ansible_mitogen.target diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index c4f583104..ff6372990 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -30,6 +30,7 @@ import errno import logging import os +import signal import socket import sys @@ -110,7 +111,7 @@ def start(cls): if cls.child_pid: cls.child_sock.close() cls.child_sock = None - cls.worker_sock.recv(1) + mitogen.core.io_op(cls.worker_sock.recv, 1) else: cls.worker_sock.close() cls.worker_sock = None @@ -128,9 +129,9 @@ def worker_main(self): self._setup_services() # Let the parent know our listening socket is ready. - self.child_sock.send('1') + mitogen.core.io_op(self.child_sock.send, '1') # Block until the socket is closed, which happens on parent exit. - self.child_sock.recv(1) + mitogen.core.io_op(self.child_sock.recv, 1) def _setup_master(self): """ @@ -140,6 +141,7 @@ def _setup_master(self): self.router.responder.whitelist_prefix('ansible') self.router.responder.whitelist_prefix('ansible_mitogen') mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown) + mitogen.core.listen(self.router.broker, 'exit', self.on_broker_exit) self.listener = mitogen.unix.Listener( router=self.router, path=self.unix_listener_path, @@ -168,14 +170,23 @@ def _setup_services(self): def on_broker_shutdown(self): """ - Respond to the Router shutdown (indirectly triggered through exit of - the main thread) by unlinking the listening socket. Ideally this would - happen explicitly, but Ansible provides no hook to allow it. + Respond to broker shutdown by beginning service pool shutdown. Do not + join on the pool yet, since that would block the broker thread which + then cannot clean up pending handlers, which is required for the + threads to exit gracefully. """ - self.pool.stop() + self.pool.stop(join=False) try: os.unlink(self.listener.path) except OSError, e: # Prevent a shutdown race with the parent process. if e.args[0] != errno.ENOENT: raise + + def on_broker_exit(self): + """ + Respond to the broker thread about to exit by sending SIGTERM to + ourself. In future this should gracefully join the pool, but TERM is + fine for now. + """ + os.kill(os.getpid(), signal.SIGTERM) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index e14d26bda..7155aee1e 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -38,6 +38,7 @@ from __future__ import absolute_import import cStringIO import ctypes +import errno import imp import json import logging @@ -148,7 +149,7 @@ def revert(self): implementation simply restores the original environment. """ self._env.revert() - self._cleanup_temp() + self._try_cleanup_temp() def _cleanup_temp(self): """ @@ -162,6 +163,20 @@ def _cleanup_temp(self): LOG.debug('Deleting %r', path) ansible_mitogen.target.prune_tree(path) + def _try_cleanup_temp(self): + """ + During broker shutdown triggered by async task timeout or loss of + connection to the parent, it is possible for prune_tree() in + target.py::_on_broker_shutdown() to run before _cleanup_temp(), so skip + cleanup if the directory or a file disappears from beneath us. + """ + try: + self._cleanup_temp() + except (IOError, OSError) as e: + if e.args[0] == errno.ENOENT: + return + raise + def _run(self): """ The _run() method is expected to return a dictionary in the form of diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 9d4d8d3a7..9bf239d3a 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -266,75 +266,67 @@ def _get_async_dir(): ) -def _write_job_status(job_id, dct): - """ - Update an async job status file. - """ - LOG.info('_write_job_status(%r, %r)', job_id, dct) - dct.setdefault('ansible_job_id', job_id) - dct.setdefault('data', '') - - async_dir = _get_async_dir() - if not os.path.exists(async_dir): - os.makedirs(async_dir) - - path = os.path.join(async_dir, job_id) - with open(path + '.tmp', 'w') as fp: - fp.write(json.dumps(dct)) - os.rename(path + '.tmp', path) - - -def _sigalrm(broker, timeout_secs, job_id): - """ - Respond to SIGALRM (job timeout) by updating the job file and killing the - process. - """ - msg = "Job reached maximum time limit of %d seconds." % (timeout_secs,) - _write_job_status(job_id, { - "failed": 1, - "finished": 1, - "msg": msg, - }) - broker.shutdown() - - -def _install_alarm(broker, timeout_secs, job_id): - handler = lambda *_: _sigalrm(broker, timeout_secs, job_id) - signal.signal(signal.SIGALRM, handler) - signal.alarm(timeout_secs) - - -def _run_module_async(kwargs, job_id, timeout_secs, econtext): - """ - 1. Immediately updates the status file to mark the job as started. - 2. Installs a timer/signal handler to implement the time limit. - 3. Runs as with run_module(), writing the result to the status file. - - :param dict kwargs: - Runner keyword arguments. - :param str job_id: - String job ID. - :param int timeout_secs: - If >0, limit the task's maximum run time. - """ - _write_job_status(job_id, { - 'started': 1, - 'finished': 0, - 'pid': os.getpid() - }) - - if timeout_secs > 0: - _install_alarm(econtext.broker, timeout_secs, job_id) +class AsyncRunner(object): + def __init__(self, job_id, timeout_secs, econtext, kwargs): + self.job_id = job_id + self.timeout_secs = timeout_secs + self.econtext = econtext + self.kwargs = kwargs + self._timed_out = False + self._init_path() + + def _init_path(self): + async_dir = _get_async_dir() + if not os.path.exists(async_dir): + os.makedirs(async_dir) + self.path = os.path.join(async_dir, self.job_id) + + def _update(self, dct): + """ + Update an async job status file. + """ + LOG.info('%r._update(%r, %r)', self, self.job_id, dct) + dct.setdefault('ansible_job_id', self.job_id) + dct.setdefault('data', '') + + with open(self.path + '.tmp', 'w') as fp: + fp.write(json.dumps(dct)) + os.rename(self.path + '.tmp', self.path) + + def _on_sigalrm(self, signum, frame): + """ + Respond to SIGALRM (job timeout) by updating the job file and killing + the process. + """ + msg = "Job reached maximum time limit of %d seconds." % ( + self.timeout_secs, + ) + self._update({ + "failed": 1, + "finished": 1, + "msg": msg, + }) + self._timed_out = True + self.econtext.broker.shutdown() + + def _install_alarm(self): + signal.signal(signal.SIGALRM, self._on_sigalrm) + signal.alarm(self.timeout_secs) + + def _run_module(self): + kwargs = dict(self.kwargs, **{ + 'detach': True, + 'econtext': self.econtext, + 'emulate_tty': False, + }) - kwargs['detach'] = True - kwargs['econtext'] = econtext - kwargs['emulate_tty'] = False - dct = run_module(kwargs) - if mitogen.core.PY3: - for key in 'stdout', 'stderr': - dct[key] = dct[key].decode('utf-8', 'surrogateescape') + dct = run_module(kwargs) + if mitogen.core.PY3: + for key in 'stdout', 'stderr': + dct[key] = dct[key].decode('utf-8', 'surrogateescape') + return dct - try: + def _parse_result(self, dct): filtered, warnings = ( ansible.module_utils.json_utils. _filter_non_json_lines(dct['stdout']) @@ -342,34 +334,69 @@ def _run_module_async(kwargs, job_id, timeout_secs, econtext): result = json.loads(filtered) result.setdefault('warnings', []).extend(warnings) result['stderr'] = dct['stderr'] - _write_job_status(job_id, result) - except Exception: - _write_job_status(job_id, { - "failed": 1, - "msg": traceback.format_exc(), - "data": dct['stdout'], # temporary notice only - "stderr": dct['stderr'] + self._update(result) + + def _run(self): + """ + 1. Immediately updates the status file to mark the job as started. + 2. Installs a timer/signal handler to implement the time limit. + 3. Runs as with run_module(), writing the result to the status file. + + :param dict kwargs: + Runner keyword arguments. + :param str job_id: + String job ID. + :param int timeout_secs: + If >0, limit the task's maximum run time. + """ + self._update({ + 'started': 1, + 'finished': 0, + 'pid': os.getpid() }) + if self.timeout_secs > 0: + self._install_alarm() + + dct = self._run_module() + if not self._timed_out: + # After SIGALRM fires, there is a window between broker responding + # to shutdown() by killing the process, and work continuing on the + # main thread. If main thread was asleep in at least + # basic.py/select.select(), an EINTR will be raised. We want to + # discard that exception. + try: + self._parse_result(dct) + except Exception: + self._update({ + "failed": 1, + "msg": traceback.format_exc(), + "data": dct['stdout'], # temporary notice only + "stderr": dct['stderr'] + }) + + def run(self): + try: + try: + self._run() + except Exception: + self._update({ + "failed": 1, + "msg": traceback.format_exc(), + }) + finally: + self.econtext.broker.shutdown() + @mitogen.core.takes_econtext def run_module_async(kwargs, job_id, timeout_secs, econtext): """ - Arrange for a module to be executed with its run status and result - serialized to a disk file. This function expects to run in a child forked - using :func:`create_fork_child`. + Execute a module with its run status and result written to a file, + terminating on the process on completion. This function must run in a child + forked using :func:`create_fork_child`. """ - try: - try: - _run_module_async(kwargs, job_id, timeout_secs, econtext) - except Exception: - # Catch any (ansible_mitogen) bugs and write them to the job file. - _write_job_status(job_id, { - "failed": 1, - "msg": traceback.format_exc(), - }) - finally: - econtext.broker.shutdown() + arunner = AsyncRunner(job_id, timeout_secs, econtext, kwargs) + arunner.run() def make_temp_directory(base_dir): diff --git a/mitogen/debug.py b/mitogen/debug.py index 95f7db3e8..312c8d9a4 100644 --- a/mitogen/debug.py +++ b/mitogen/debug.py @@ -48,6 +48,15 @@ _last = None +def enable_evil_interrupts(): + signal.signal(signal.SIGALRM, (lambda a, b: None)) + signal.setitimer(signal.ITIMER_REAL, 0.01, 0.01) + + +def disable_evil_interrupts(): + signal.setitimer(signal.ITIMER_REAL, 0, 0) + + def _hex(n): return '%08x' % n diff --git a/mitogen/service.py b/mitogen/service.py index 62180e333..ffb456495 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -271,9 +271,12 @@ def _run(self): method_name, kwargs, msg = tup try: super(SerializedInvoker, self).invoke(method_name, kwargs, msg) + except mitogen.core.CallError: + e = sys.exc_info()[1] + LOG.warning('%r: call error: %s: %s', self, msg, e) + msg.reply(e) except Exception: - LOG.exception('%r: while invoking %r of %r', - self, method_name, self.service) + LOG.exception('%r: while invoking %s()', self, method_name) msg.reply(mitogen.core.Message.dead()) def invoke(self, method_name, kwargs, msg): @@ -456,9 +459,13 @@ def add(self, service): closed = False - def stop(self): + def stop(self, join=True): self.closed = True self._select.close() + if join: + self.join() + + def join(self): for th in self._threads: th.join() for invoker in self._invoker_by_name.itervalues(): diff --git a/tests/ansible/ansible.cfg b/tests/ansible/ansible.cfg index 437e67b7e..eeb821099 100644 --- a/tests/ansible/ansible.cfg +++ b/tests/ansible/ansible.cfg @@ -4,6 +4,7 @@ gathering = explicit strategy_plugins = ../../ansible_mitogen/plugins/strategy action_plugins = lib/action callback_plugins = lib/callback +stdout_callback = nice_stdout library = lib/modules # module_utils = lib/module_utils retry_files_enabled = False diff --git a/tests/ansible/integration/async/runner_two_simultaneous_jobs.yml b/tests/ansible/integration/async/runner_two_simultaneous_jobs.yml index 51ceef2fe..9474263bd 100644 --- a/tests/ansible/integration/async/runner_two_simultaneous_jobs.yml +++ b/tests/ansible/integration/async/runner_two_simultaneous_jobs.yml @@ -6,16 +6,20 @@ # Start 2 duplicate jobs, verify they run concurrently. + - file: + path: /tmp/flurp + state: absent + - name: create semaphore file and sleep for 5 seconds. shell: | exec 2>/dev/null; bash -c ' - echo im_alive $$ > /tmp/flurp + echo im_alive $$ > /tmp/flurp; sleep 60; '; rm -f /tmp/flurp; echo alldone - async: 1000 + async: 30 poll: 0 register: job1 @@ -24,30 +28,29 @@ # below compltes quickly. - name: verify semaphore file exists while this job exists. shell: | - [ -f /tmp/flurp ] && { - read im_alive pid < /tmp/flurp - echo $im_alive - kill $pid &>/dev/null - } - async: 1000 + while [ ! -f /tmp/flurp ]; do sleep 0.1; done; + read im_alive pid < /tmp/flurp + echo $im_alive + kill $pid &>/dev/null + async: 30 poll: 0 register: job2 - - name: (job1) busy-poll up to 100000 times + - name: (job1) poll async_status: jid: "{{job1.ansible_job_id}}" register: result1 until: result1.finished - retries: 100000 - delay: 0 + retries: 5 + delay: 1 - - name: (job2) busy-poll up to 100000 times + - name: (job2) poll async_status: jid: "{{job2.ansible_job_id}}" register: result2 until: result2.finished - retries: 100000 - delay: 0 + retries: 5 + delay: 1 - assert: that: diff --git a/tests/ansible/lib/callback/nice_stdout.py b/tests/ansible/lib/callback/nice_stdout.py new file mode 100644 index 000000000..08a3757be --- /dev/null +++ b/tests/ansible/lib/callback/nice_stdout.py @@ -0,0 +1,54 @@ +from __future__ import unicode_literals +import io + +try: + from ansible.plugins import callback_loader +except ImportError: + from ansible.plugins.loader import callback_loader + + +def printi(tio, obj, key=None, indent=0): + def write(s, *args): + if args: + s %= args + tio.write(' ' * indent) + if key is not None: + tio.write('%s: ' % (key,)) + tio.write(s) + tio.write('\n') + + if isinstance(obj, (list, tuple)): + write('[') + for i, obj2 in enumerate(obj): + printi(tio, obj2, key=i, indent=indent+1) + key = None + write(']') + elif isinstance(obj, dict): + write('{') + for key2, obj2 in sorted(obj.iteritems()): + if not (key2.startswith('_ansible_') or + key2.endswith('_lines')): + printi(tio, obj2, key=key2, indent=indent+1) + key = None + write('}') + elif isinstance(obj, basestring): + if isinstance(obj, str): + obj = obj.decode('utf-8', 'replace') + for line in obj.splitlines(): + write('%s', line.rstrip('\r\n')) + else: + write('%r', obj) + + +DefaultModule = callback_loader.get('default', class_only=True) + +class CallbackModule(DefaultModule): + def _dump_results(self, result, *args, **kwargs): + try: + tio = io.StringIO() + printi(tio, result) + return tio.getvalue().encode('ascii', 'replace') + except: + import traceback + traceback.print_exc() + raise diff --git a/tests/ansible/osx_setup.yml b/tests/ansible/osx_setup.yml index 655c76059..7a6ff23f5 100644 --- a/tests/ansible/osx_setup.yml +++ b/tests/ansible/osx_setup.yml @@ -35,6 +35,7 @@ - has_sudo_pubkey - require_tty - pw_required + - readonly_homedir - require_tty_pw_required - slow_user when: ansible_system != 'Darwin'