diff --git a/.ci/azure-pipelines-steps.yml b/.ci/azure-pipelines-steps.yml new file mode 100644 index 000000000..a377d795c --- /dev/null +++ b/.ci/azure-pipelines-steps.yml @@ -0,0 +1,20 @@ + +parameters: + name: '' + pool: '' + sign: false + +steps: +- task: UsePythonVersion@0 + inputs: + versionSpec: '$(python.version)' + architecture: 'x64' + +- script: .ci/prep_azure.py + displayName: "Install requirements." + +- script: .ci/$(MODE)_install.py + displayName: "Install requirements." + +- script: .ci/$(MODE)_tests.py + displayName: Run tests. diff --git a/.ci/azure-pipelines.yml b/.ci/azure-pipelines.yml index fbbb9640c..dc5f71622 100644 --- a/.ci/azure-pipelines.yml +++ b/.ci/azure-pipelines.yml @@ -5,79 +5,85 @@ jobs: -- job: 'MitogenTests' +- job: Mac + steps: + - template: azure-pipelines-steps.yml pool: - vmImage: 'Ubuntu 16.04' + vmImage: macOS-10.13 strategy: matrix: - Mitogen27Debian_27: + Mito27_27: python.version: '2.7' MODE: mitogen - DISTRO: debian - MitogenPy27CentOS6_26: + +- job: Linux + pool: + vmImage: "Ubuntu 16.04" + steps: + - template: azure-pipelines-steps.yml + strategy: + matrix: + # + # Confirmed working + # + Mito27Debian_27: python.version: '2.7' MODE: mitogen - DISTRO: centos6 + DISTRO: debian - #Py26CentOS7: + #MitoPy27CentOS6_26: #python.version: '2.7' #MODE: mitogen #DISTRO: centos6 - Mitogen36CentOS6_26: + Mito36CentOS6_26: python.version: '3.6' MODE: mitogen DISTRO: centos6 - DebOps_2460_27_27: - python.version: '2.7' - MODE: debops_common - VER: 2.4.6.0 - - DebOps_262_36_27: - python.version: '3.6' - MODE: debops_common - VER: 2.6.2 - - Ansible_2460_26: - python.version: '2.7' - MODE: ansible - VER: 2.4.6.0 + # + # + # - Ansible_262_26: - python.version: '2.7' - MODE: ansible - VER: 2.6.2 + #Py26CentOS7: + #python.version: '2.7' + #MODE: mitogen + #DISTRO: centos6 - Ansible_2460_36: - python.version: '3.6' - MODE: ansible - VER: 2.4.6.0 + #DebOps_2460_27_27: + #python.version: '2.7' + #MODE: debops_common + #VER: 2.4.6.0 - Ansible_262_36: - python.version: '3.6' - MODE: ansible - VER: 2.6.2 + #DebOps_262_36_27: + #python.version: '3.6' + #MODE: debops_common + #VER: 2.6.2 - Vanilla_262_27: - python.version: '2.7' - MODE: ansible - VER: 2.6.2 - DISTROS: debian - STRATEGY: linear + #Ansible_2460_26: + #python.version: '2.7' + #MODE: ansible + #VER: 2.4.6.0 - steps: - - task: UsePythonVersion@0 - inputs: - versionSpec: '$(python.version)' - architecture: 'x64' + #Ansible_262_26: + #python.version: '2.7' + #MODE: ansible + #VER: 2.6.2 - - script: .ci/prep_azure.py - displayName: "Install requirements." + #Ansible_2460_36: + #python.version: '3.6' + #MODE: ansible + #VER: 2.4.6.0 - - script: .ci/$(MODE)_install.py - displayName: "Install requirements." + #Ansible_262_36: + #python.version: '3.6' + #MODE: ansible + #VER: 2.6.2 - - script: .ci/$(MODE)_tests.py - displayName: Run tests. + #Vanilla_262_27: + #python.version: '2.7' + #MODE: ansible + #VER: 2.6.2 + #DISTROS: debian + #STRATEGY: linear diff --git a/.ci/ci_lib.py b/.ci/ci_lib.py index 10e9d11ee..e1cb84d5e 100644 --- a/.ci/ci_lib.py +++ b/.ci/ci_lib.py @@ -43,6 +43,18 @@ def subprocess__check_output(*popenargs, **kwargs): subprocess.check_output = subprocess__check_output +# ------------------ + +def have_apt(): + proc = subprocess.Popen('apt --help >/dev/null 2>/dev/null', shell=True) + return proc.wait() == 0 + + +def have_docker(): + proc = subprocess.Popen('docker info >/dev/null 2>/dev/null', shell=True) + return proc.wait() == 0 + + # ----------------- # Force stdout FD 1 to be a pipe, so tools like pip don't spam progress bars. @@ -134,6 +146,17 @@ def __exit__(self, _1, _2, _3): BASE_PORT = 2200 TMP = TempDir().path + +# We copy this out of the way to avoid random stuff modifying perms in the Git +# tree (like git pull). +src_key_file = os.path.join(GIT_ROOT, + 'tests/data/docker/mitogen__has_sudo_pubkey.key') +key_file = os.path.join(TMP, + 'mitogen__has_sudo_pubkey.key') +shutil.copyfile(src_key_file, key_file) +os.chmod(key_file, int('0600', 8)) + + os.environ['PYTHONDONTWRITEBYTECODE'] = 'x' os.environ['PYTHONPATH'] = '%s:%s' % ( os.environ.get('PYTHONPATH', ''), @@ -153,7 +176,7 @@ def image_for_distro(distro): return 'mitogen/%s-test' % (distro.partition('-')[0],) -def make_containers(): +def make_containers(name_prefix='', port_offset=0): docker_hostname = get_docker_hostname() firstbit = lambda s: (s+'-').split('-')[0] secondbit = lambda s: (s+'-').split('-')[1] @@ -171,9 +194,9 @@ def make_containers(): for x in range(count): lst.append({ "distro": firstbit(distro), - "name": "target-%s-%s" % (distro, i), + "name": name_prefix + ("target-%s-%s" % (distro, i)), "hostname": docker_hostname, - "port": BASE_PORT + i, + "port": BASE_PORT + i + port_offset, "python_path": ( '/usr/bin/python3' if secondbit(distro) == 'py3' @@ -195,6 +218,8 @@ def start_containers(containers): "docker run " "--rm " "--detach " + "--privileged " + "--cap-add=SYS_PTRACE " "--publish 0.0.0.0:%(port)s:22/tcp " "--hostname=%(name)s " "--name=%(name)s " diff --git a/.ci/debops_common_tests.py b/.ci/debops_common_tests.py index 8e9f29537..b0e2e4e81 100755 --- a/.ci/debops_common_tests.py +++ b/.ci/debops_common_tests.py @@ -2,6 +2,7 @@ from __future__ import print_function import os +import shutil import ci_lib @@ -10,17 +11,13 @@ ci_lib.DISTROS = ['debian'] * ci_lib.TARGET_COUNT project_dir = os.path.join(ci_lib.TMP, 'project') -key_file = os.path.join( - ci_lib.GIT_ROOT, - 'tests/data/docker/mitogen__has_sudo_pubkey.key', -) vars_path = 'ansible/inventory/group_vars/debops_all_hosts.yml' inventory_path = 'ansible/inventory/hosts' docker_hostname = ci_lib.get_docker_hostname() with ci_lib.Fold('docker_setup'): - containers = ci_lib.make_containers() + containers = ci_lib.make_containers(port_offset=500, name_prefix='debops-') ci_lib.start_containers(containers) @@ -36,7 +33,6 @@ % (ci_lib.GIT_ROOT,) ) - ci_lib.run('chmod go= %s', key_file) with open(vars_path, 'w') as fp: fp.write( "ansible_python_interpreter: /usr/bin/python2.7\n" @@ -47,7 +43,7 @@ "\n" # Speed up slow DH generation. "dhparam__bits: ['128', '64']\n" - % (key_file,) + % (ci_lib.key_file,) ) with open(inventory_path, 'a') as fp: diff --git a/.ci/mitogen_install.py b/.ci/mitogen_install.py index 10813b551..72bc75e3c 100755 --- a/.ci/mitogen_install.py +++ b/.ci/mitogen_install.py @@ -6,10 +6,12 @@ [ 'pip install "pycparser<2.19" "idna<2.7"', 'pip install -r tests/requirements.txt', - ], - [ - 'docker pull %s' % (ci_lib.image_for_distro(ci_lib.DISTRO),), ] ] +if ci_lib.have_docker(): + batches.append([ + 'docker pull %s' % (ci_lib.image_for_distro(ci_lib.DISTRO),), + ]) + ci_lib.run_batches(batches) diff --git a/.ci/mitogen_tests.py b/.ci/mitogen_tests.py index 4ba796c27..36928ac9e 100755 --- a/.ci/mitogen_tests.py +++ b/.ci/mitogen_tests.py @@ -11,4 +11,7 @@ 'SKIP_ANSIBLE': '1', }) +if not ci_lib.have_docker(): + os.environ['SKIP_DOCKER_TESTS'] = '1' + ci_lib.run('./run_tests -v') diff --git a/.ci/prep_azure.py b/.ci/prep_azure.py index 10126df2a..5199a87e0 100755 --- a/.ci/prep_azure.py +++ b/.ci/prep_azure.py @@ -1,22 +1,30 @@ #!/usr/bin/env python +import os +import sys + import ci_lib batches = [] -batches.append([ - 'echo force-unsafe-io | sudo tee /etc/dpkg/dpkg.cfg.d/nosync', - 'sudo add-apt-repository ppa:deadsnakes/ppa', - 'sudo apt-get update', - 'sudo apt-get -y install python2.6 python2.6-dev libsasl2-dev libldap2-dev', -]) - -batches.append([ - 'pip install -r dev_requirements.txt', -]) - -batches.extend( - ['docker pull %s' % (ci_lib.image_for_distro(distro),)] - for distro in ci_lib.DISTROS -) + +if ci_lib.have_apt(): + batches.append([ + 'echo force-unsafe-io | sudo tee /etc/dpkg/dpkg.cfg.d/nosync', + 'sudo add-apt-repository ppa:deadsnakes/ppa', + 'sudo apt-get update', + 'sudo apt-get -y install python2.6 python2.6-dev libsasl2-dev libldap2-dev', + ]) + + +#batches.append([ + #'pip install -r dev_requirements.txt', +#]) + +if ci_lib.have_docker(): + batches.extend( + ['docker pull %s' % (ci_lib.image_for_distro(distro),)] + for distro in ci_lib.DISTROS + ) + ci_lib.run_batches(batches) diff --git a/.ci/soak/debops_common.sh b/.ci/soak/debops_common.sh new file mode 100755 index 000000000..eefb4917c --- /dev/null +++ b/.ci/soak/debops_common.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +export NOCOVERAGE=1 + +# Make Docker containers once. +/usr/bin/time -v ./.ci/debops_common_tests.py "$@" || break +export KEEP=1 + +i=0 +while : +do + i=$((i + 1)) + /usr/bin/time -v ./.ci/debops_common_tests.py "$@" || break +done + +echo $i diff --git a/.ci/soak/mitogen.sh b/.ci/soak/mitogen.sh new file mode 100755 index 000000000..15d625295 --- /dev/null +++ b/.ci/soak/mitogen.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +export NOCOVERAGE=1 +export DISTROS="debian*4" + +# Make Docker containers once. +/usr/bin/time -v ./.ci/ansible_tests.py "$@" +export KEEP=1 + +i=0 +while : +do + i=$((i + 1)) + /usr/bin/time -v ./.ci/ansible_tests.py "$@" || break +done + +echo $i diff --git a/.ci/soak/mitogen_py24.sh b/.ci/soak/mitogen_py24.sh new file mode 100755 index 000000000..475e08759 --- /dev/null +++ b/.ci/soak/mitogen_py24.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +export NOCOVERAGE=1 + +i=0 +while : +do + i=$((i + 1)) + /usr/bin/time -v ./.ci/mitogen_py24_tests.py "$@" || break +done + +echo $i diff --git a/.travis.yml b/.travis.yml index aee14c00f..921ad12bc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -27,9 +27,7 @@ matrix: # 2.4 -> 2.4 - language: c env: MODE=mitogen_py24 DISTRO=centos5 - # 2.7 -> 2.7 - - python: "2.7" - env: MODE=mitogen DISTRO=debian + # 2.7 -> 2.7 -- moved to Azure # 2.7 -> 2.6 #- python: "2.7" #env: MODE=mitogen DISTRO=centos6 @@ -39,9 +37,7 @@ matrix: # 2.6 -> 3.5 - python: "2.6" env: MODE=mitogen DISTRO=debian-py3 - # 3.6 -> 2.6 - - python: "3.6" - env: MODE=mitogen DISTRO=centos6 + # 3.6 -> 2.6 -- moved to Azure # Debops tests. # 2.4.6.0; 2.7 -> 2.7 diff --git a/ansible_mitogen/compat/__init__.py b/ansible_mitogen/compat/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 254a42866..bd4330ff2 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -33,7 +33,6 @@ import logging import os import pprint -import random import stat import sys import time @@ -356,6 +355,7 @@ def _connect_mitogen_doas(spec): 'machinectl': _connect_machinectl, 'setns': _connect_setns, 'ssh': _connect_ssh, + 'smart': _connect_ssh, # issue #548. 'su': _connect_su, 'sudo': _connect_sudo, 'doas': _connect_doas, @@ -702,35 +702,6 @@ def get_good_temp_dir(self): self._connect() return self.init_child_result['good_temp_dir'] - def _generate_tmp_path(self): - return os.path.join( - self.get_good_temp_dir(), - 'ansible_mitogen_action_%016x' % ( - random.getrandbits(8*8), - ) - ) - - def _make_tmp_path(self): - assert getattr(self._shell, 'tmpdir', None) is None - self._shell.tmpdir = self._generate_tmp_path() - LOG.debug('Temporary directory: %r', self._shell.tmpdir) - self.get_chain().call_no_reply(os.mkdir, self._shell.tmpdir) - return self._shell.tmpdir - - def _reset_tmp_path(self): - """ - Called by _mitogen_reset(); ask the remote context to delete any - temporary directory created for the action. CallChain is not used here - to ensure exception is logged by the context on failure, since the - CallChain itself is about to be destructed. - """ - if getattr(self._shell, 'tmpdir', None) is not None: - self.context.call_no_reply( - ansible_mitogen.target.prune_tree, - self._shell.tmpdir, - ) - self._shell.tmpdir = None - def _connect(self): """ Establish a connection to the master process's UNIX listener socket, @@ -762,7 +733,6 @@ def _mitogen_reset(self, mode): if not self.context: return - self._reset_tmp_path() self.chain.reset() self.parent.call_service( service_name='ansible_mitogen.services.ContextService', diff --git a/ansible_mitogen/logging.py b/ansible_mitogen/logging.py index 1c439be8c..ce6f16591 100644 --- a/ansible_mitogen/logging.py +++ b/ansible_mitogen/logging.py @@ -40,6 +40,25 @@ display = Display() +#: The process name set via :func:`set_process_name`. +_process_name = None + +#: The PID of the process that last called :func:`set_process_name`, so its +#: value can be ignored in unknown fork children. +_process_pid = None + + +def set_process_name(name): + """ + Set a name to adorn log messages with. + """ + global _process_name + _process_name = name + + global _process_pid + _process_pid = os.getpid() + + class Handler(logging.Handler): """ Use Mitogen's log format, but send the result to a Display method. @@ -65,7 +84,12 @@ def emit(self, record): if mitogen_name in self.NOISY_LOGGERS and record.levelno >= logging.WARNING: record.levelno = logging.DEBUG - s = '[pid %d] %s' % (os.getpid(), self.format(record)) + if _process_pid == os.getpid(): + process_name = _process_name + else: + process_name = '?' + + s = '[%-4s %d] %s' % (process_name, os.getpid(), self.format(record)) if record.levelno >= logging.ERROR: display.error(s, wrap_text=False) elif record.levelno >= logging.WARNING: diff --git a/ansible_mitogen/mixins.py b/ansible_mitogen/mixins.py index 5f51cc6fa..890467fd5 100644 --- a/ansible_mitogen/mixins.py +++ b/ansible_mitogen/mixins.py @@ -30,6 +30,7 @@ import logging import os import pwd +import random import traceback try: @@ -173,26 +174,48 @@ def _is_pipelining_enabled(self, module_style, wrap_async=False): """ assert False, "_is_pipelining_enabled() should never be called." + def _generate_tmp_path(self): + return os.path.join( + self._connection.get_good_temp_dir(), + 'ansible_mitogen_action_%016x' % ( + random.getrandbits(8*8), + ) + ) + + def _generate_tmp_path(self): + return os.path.join( + self._connection.get_good_temp_dir(), + 'ansible_mitogen_action_%016x' % ( + random.getrandbits(8*8), + ) + ) + def _make_tmp_path(self, remote_user=None): """ - Return the directory created by the Connection instance during - connection. + Create a temporary subdirectory as a child of the temporary directory + managed by the remote interpreter. """ LOG.debug('_make_tmp_path(remote_user=%r)', remote_user) - return self._connection._make_tmp_path() + path = self._generate_tmp_path() + LOG.debug('Temporary directory: %r', path) + self._connection.get_chain().call_no_reply(os.mkdir, path) + self._connection._shell.tmpdir = path + return path def _remove_tmp_path(self, tmp_path): """ - Stub out the base implementation's invocation of rm -rf, replacing it - with nothing, as the persistent interpreter automatically cleans up - after itself without introducing roundtrips. + Replace the base implementation's invocation of rm -rf, replacing it + with a pipelined call to :func:`ansible_mitogen.target.prune_tree`. """ - # The actual removal is pipelined by Connection.close(). LOG.debug('_remove_tmp_path(%r)', tmp_path) - # Upstream _remove_tmp_path resets shell.tmpdir here, however - # connection.py uses that as the sole location of the temporary - # directory, if one exists. - # self._connection._shell.tmpdir = None + if tmp_path is None and ansible.__version__ > '2.6': + tmp_path = self._connection._shell.tmpdir # 06f73ad578d + if tmp_path is not None: + self._connection.get_chain().call_no_reply( + ansible_mitogen.target.prune_tree, + tmp_path, + ) + self._connection._shell.tmpdir = None def _transfer_data(self, remote_path, data): """ @@ -331,7 +354,7 @@ def _execute_module(self, module_name=None, module_args=None, tmp=None, self._temp_file_gibberish(module_args, wrap_async) self._connection._connect() - return ansible_mitogen.planner.invoke( + result = ansible_mitogen.planner.invoke( ansible_mitogen.planner.Invocation( action=self, connection=self._connection, @@ -345,6 +368,14 @@ def _execute_module(self, module_name=None, module_args=None, tmp=None, ) ) + if ansible.__version__ < '2.5' and delete_remote_tmp and \ + getattr(self._connection._shell, 'tmpdir', None) is not None: + # Built-in actions expected tmpdir to be cleaned up automatically + # on _execute_module(). + self._remove_tmp_path(self._connection._shell.tmpdir) + + return result + def _postprocess_response(self, result): """ Apply fixups mimicking ActionBase._execute_module(); this is copied diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index d7f364966..e4e61e8bc 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -185,19 +185,21 @@ def start(cls, _init_logging=True): cls.profiling = os.environ.get('MITOGEN_PROFILING') is not None if cls.profiling: mitogen.core.enable_profiling() + if _init_logging: + ansible_mitogen.logging.setup() cls.original_env = dict(os.environ) cls.child_pid = os.fork() - if _init_logging: - ansible_mitogen.logging.setup() if cls.child_pid: save_pid('controller') + ansible_mitogen.logging.set_process_name('top') ansible_mitogen.affinity.policy.assign_controller() cls.child_sock.close() cls.child_sock = None mitogen.core.io_op(cls.worker_sock.recv, 1) else: save_pid('mux') + ansible_mitogen.logging.set_process_name('mux') ansible_mitogen.affinity.policy.assign_muxprocess() cls.worker_sock.close() cls.worker_sock = None diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index 04c70e78c..30c36be75 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -40,6 +40,7 @@ import codecs import imp import os +import re import shlex import shutil import sys @@ -806,11 +807,20 @@ def _get_program_filename(self): def _setup_args(self): pass + # issue #555: in old times it was considered good form to reload sys and + # change the default encoding. This hack was removed from Ansible long ago, + # but not before permeating into many third party modules. + PREHISTORIC_HACK_RE = re.compile( + b(r'reload\s*\(\s*sys\s*\)\s*' + r'sys\s*\.\s*setdefaultencoding\([^)]+\)') + ) + def _setup_program(self): - self.source = ansible_mitogen.target.get_small_file( + source = ansible_mitogen.target.get_small_file( context=self.service_context, path=self.path, ) + self.source = self.PREHISTORIC_HACK_RE.sub(b(''), source) def _get_code(self): try: diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index 50486841d..ba0ff5251 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -108,6 +108,7 @@ def wrap_worker__run(*args, **kwargs): if mitogen.core._profile_hook.__name__ != '_profile_hook': signal.signal(signal.SIGTERM, signal.SIG_IGN) + ansible_mitogen.logging.set_process_name('task') ansible_mitogen.affinity.policy.assign_worker() return mitogen.core._profile_hook('WorkerProcess', lambda: worker__run(*args, **kwargs) diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 809165daa..40e5c57b0 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -260,14 +260,6 @@ def prune_tree(path): LOG.error('prune_tree(%r): %s', path, e) -def _on_broker_shutdown(): - """ - Respond to broker shutdown (graceful termination by parent, or loss of - connection to parent) by deleting our sole temporary directory. - """ - prune_tree(temp_dir) - - def is_good_temp_dir(path): """ Return :data:`True` if `path` can be used as a temporary directory, logging diff --git a/docs/ansible.rst b/docs/ansible.rst index f77880118..22b1433f7 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -5,13 +5,14 @@ Mitogen for Ansible .. image:: images/ansible/ansible_mitogen.svg :class: mitogen-right-180 mitogen-logo-wrap -An extension to `Ansible`_ is included that implements connections over -Mitogen, replacing embedded shell invocations with pure-Python equivalents -invoked via highly efficient remote procedure calls to persistent interpreters -tunnelled over SSH. No changes are required to target hosts. +**Mitogen for Ansible** is a completely redesigned UNIX connection layer and +module runtime for `Ansible`_. Requiring minimal configuration changes, it +updates Ansible's slow and wasteful shell-centic implementation with +pure-Python equivalents, invoked via highly efficient remote procedure calls to +persistent interpreters tunnelled over SSH. No changes are required to target +hosts. -The extension is stable and real-world use is encouraged. `Bug reports`_ are -welcome: Ansible is huge, and only wide testing will ensure soundness. +The extension is considered stable and real-world use is encouraged. .. _Ansible: https://www.ansible.com/ @@ -56,7 +57,7 @@ write files. Installation ------------ -1. Thoroughly review :ref:`noteworthy_differences` and :ref:`known_issues`. +1. Review :ref:`noteworthy_differences`. 2. Download and extract |mitogen_url|. 3. Modify ``ansible.cfg``: @@ -142,13 +143,29 @@ Testimonials Noteworthy Differences ---------------------- -* Ansible 2.3-2.7 are supported along with Python 2.6, 2.7 or 3.6. Verify your - installation is running one of these versions by checking ``ansible +* Ansible 2.3-2.7 are supported along with Python 2.6, 2.7, 3.6 and 3.7. Verify + your installation is running one of these versions by checking ``ansible --version`` output. -* The Ansible ``raw`` action executes as a regular Mitogen connection, - precluding its use for installing Python on a target. This will be addressed - soon. +* The ``raw`` action executes as a regular Mitogen connection, which requires + Python on the target, precluding its use for installing Python. This will be + addressed in a future release. For now, simply mix Mitogen and vanilla + Ansible strategies: + + .. code-block:: yaml + + - hosts: web-servers + strategy: linear + tasks: + - name: Install Python if necessary. + raw: test -e /usr/bin/python || apt install -y python-minimal + + - hosts: web-servers + strategy: mitogen_linear + roles: + - nginx + - initech_app + - y2k_fix * The ``doas``, ``su`` and ``sudo`` become methods are available. File bugs to register interest in more. @@ -165,43 +182,70 @@ Noteworthy Differences :ref:`mitogen_su `, :ref:`mitogen_sudo `, and :ref:`setns ` types. File bugs to register interest in others. -* Local commands execute in a reuseable interpreter created identically to - interpreters on targets. Presently one interpreter per ``become_user`` - exists, and so only one local action may execute simultaneously. +* Actions are single-threaded for each `(host, user account)` combination, + including actions that execute on the local machine. Playbooks may experience + slowdown compared to vanilla Ansible if they employ long-running + ``local_action`` or ``delegate_to`` tasks delegating many target hosts to a + single machine and user account. Ansible usually permits up to ``forks`` simultaneous local actions. Any long-running local actions that execute for every target will experience artificial serialization, causing slowdown equivalent to `task_duration * - num_targets`. This will be fixed soon. + num_targets`. This will be addressed soon. -* "Module Replacer" style modules are not supported. These rarely appear in - practice, and light web searches failed to reveal many examples of them. +* The Ansible 2.7 `reboot + `_ module + may require a ``pre_reboot_delay`` on systemd hosts, as insufficient time + exists for the reboot command's exit status to be reported before necessary + processes are torn down. + +* On OS X when a SSH password is specified and the default connection type of + ``smart`` is used, Ansible may select the Paramiko plug-in rather than + Mitogen. If you specify a password on OS X, ensure ``connection: ssh`` + appears in your playbook, ``ansible.cfg``, or as ``-c ssh`` on the + command-line. * Ansible permits up to ``forks`` connections to be setup in parallel, whereas in Mitogen this is handled by a fixed-size thread pool. Up to 32 connections may be established in parallel by default, this can be modified by setting the ``MITOGEN_POOL_SIZE`` environment variable. -* The ``ansible_python_interpreter`` variable is parsed using a restrictive - :mod:`shell-like ` syntax, permitting values such as ``/usr/bin/env - FOO=bar python``, which occur in practice. Ansible `documents this - `_ - as an absolute path, however the implementation passes it unquoted through - the shell, permitting arbitrary code to be injected. - -* Performance does not scale linearly with target count. This will improve over +* Performance does not scale cleanly with target count. This will improve over time. -* SSH and ``become`` are treated distinctly when applying timeouts, and - timeouts apply up to the point when the new interpreter is ready to accept - messages. Ansible has two timeouts: ``ConnectTimeout`` for SSH, applying up - to when authentication completes, and a separate parallel timeout up to when - ``become`` authentication completes. - - For busy targets, Ansible may successfully execute a module where Mitogen - would fail without increasing the timeout. For sick targets, Ansible may hang - indefinitely after authentication without executing a command, for example - due to a stuck filesystem IO appearing in ``$HOME/.profile``. +* Performance on Python 3 is significantly worse than on Python 2. While this + has not yet been investigated, at least some of the regression appears to be + part of the core library, and should therefore be straightforward to fix as + part of 0.2.x. + +.. + * SSH and ``become`` are treated distinctly when applying timeouts, and + timeouts apply up to the point when the new interpreter is ready to accept + messages. Ansible has two timeouts: ``ConnectTimeout`` for SSH, applying up + to when authentication completes, and a separate parallel timeout up to + when ``become`` authentication completes. + For busy targets, Ansible may successfully execute a module where Mitogen + would fail without increasing the timeout. For sick targets, Ansible may + hang indefinitely after authentication without executing a command, for + example due to a stuck filesystem IO appearing in ``$HOME/.profile``. + +.. + * "Module Replacer" style modules are not supported. These rarely appear in + practice, and light web searches failed to reveal many examples of them. + +.. + * The ``ansible_python_interpreter`` variable is parsed using a restrictive + :mod:`shell-like ` syntax, permitting values such as ``/usr/bin/env + FOO=bar python``, which occur in practice. Ansible `documents this + `_ + as an absolute path, however the implementation passes it unquoted through + the shell, permitting arbitrary code to be injected. + +.. + * Configurations will break that rely on the `hashbang argument splitting + behaviour `_ of the + ``ansible_python_interpreter`` setting, contrary to the Ansible + documentation. This will be addressed in a future 0.2 release. New Features & Notes @@ -252,8 +296,8 @@ container. ``ansible_password``, or ``ansible_become_pass`` inventory variables. * Automatic tunnelling of SSH-dependent actions, such as the - ``synchronize`` module, is not yet supported. This will be added in the - 0.3 series. + ``synchronize`` module, is not yet supported. This will be addressed in a + future release. To enable connection delegation, set ``mitogen_via=`` on the command line, or as host and group variables. diff --git a/docs/api.rst b/docs/api.rst index 3fd70bea5..db39ad998 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -579,6 +579,10 @@ Select Class .. module:: mitogen.select .. currentmodule:: mitogen.select + +.. autoclass:: Event + :members: + .. autoclass:: Select :members: @@ -605,6 +609,14 @@ Broker Class :members: +Fork Safety +=========== + +.. currentmodule:: mitogen.os_fork +.. autoclass:: Corker + :members: + + Utility Functions ================= diff --git a/docs/changelog.rst b/docs/changelog.rst index f7ecadbd5..dc34ac877 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -15,114 +15,87 @@ Release Notes -.. _known_issues: +v0.2.7 (unreleased) +------------------- -Known Issues ------------- +To avail of fixes in an unreleased version, please download a ZIP file +`directly from GitHub `_. -Mitogen For Ansible -~~~~~~~~~~~~~~~~~~~ +Fixes +~~~~~ + +*(none yet)* + + +v0.2.6 (2019-03-06) +------------------- + +Fixes +~~~~~ -* The Ansible 2.7 `reboot - `_ module - may require a ``pre_reboot_delay`` on systemd hosts, as insufficient time - exists for the reboot command's exit status to be reported before necessary - processes are torn down. - -* On OS X when a SSH password is specified and the default connection type of - ``smart`` is used, Ansible may select the Paramiko plug-in rather than - Mitogen. If you specify a password on OS X, ensure ``connection: ssh`` - appears in your playbook, ``ansible.cfg``, or as ``-c ssh`` on the - command-line. - -* The ``raw`` action executes as a regular Mitogen connection, which requires - Python on the target, precluding its use for installing Python. This will be - addressed in a future 0.2 release. For now, simply mix Mitogen and vanilla - Ansible strategies in your playbook: - - .. code-block:: yaml - - - hosts: web-servers - strategy: linear - tasks: - - name: Install Python if necessary. - raw: test -e /usr/bin/python || apt install -y python-minimal - - - hosts: web-servers - strategy: mitogen_linear - roles: - - nginx - - initech_app - - y2k_fix - -.. * When running with ``-vvv``, log messages will be printed to the console - *after* the Ansible run completes, as connection multiplexer shutdown only - begins after Ansible exits. This is due to a lack of suitable shutdown hook - in Ansible, and is fairly harmless, albeit cosmetically annoying. A future - release may include a solution. - -.. * Configurations will break that rely on the `hashbang argument splitting - behaviour `_ of the - ``ansible_python_interpreter`` setting, contrary to the Ansible - documentation. This will be addressed in a future 0.2 release. - -* Performance does not scale linearly with target count. This requires - significant additional work, as major bottlenecks exist in the surrounding - Ansible code. Performance-related bug reports for any scenario remain - welcome with open arms. - -* Performance on Python 3 is significantly worse than on Python 2. While this - has not yet been investigated, at least some of the regression appears to be - part of the core library, and should therefore be straightforward to fix as - part of 0.2.x. - -* *Module Replacer* style Ansible modules are not supported. - -* Actions are single-threaded for each `(host, user account)` combination, - including actions that execute on the local machine. Playbooks may experience - slowdown compared to vanilla Ansible if they employ long-running - ``local_action`` or ``delegate_to`` tasks delegating many target hosts to a - single machine and user account. - -* Connection Delegation remains in preview and has bugs around how it infers - connections. Connection establishment will remain single-threaded for the 0.2 - series, however connection inference bugs will be addressed in a future 0.2 - release. - -* Connection Delegation does not support automatic tunnelling of SSH-dependent - actions, such as the ``synchronize`` module. This will be addressed in the - 0.3 series. +* `#542 `_: some versions of OS X + ship a default Python that does not support :func:`select.poll`. Restore the + 0.2.3 behaviour of defaulting to Kqueue in this case, but still prefer + :func:`select.poll` if it is available. + +* `#545 `_: an optimization + introduced in `#493 `_ caused a + 64-bit integer to be assigned to a 32-bit field on ARM 32-bit targets, + causing runs to fail. + +* `#548 `_: `mitogen_via=` could fail + when the selected transport was set to ``smart``. + +* `#550 `_: avoid some broken + TTY-related `ioctl()` calls on Windows Subsystem for Linux 2016 Anniversary + Update. + +* `#554 `_: third party Ansible + action plug-ins that invoked :func:`_make_tmp_path` repeatedly could trigger + an assertion failure. + +* `#555 `_: work around an old idiom + that reloaded :mod:`sys` in order to change the interpreter's default encoding. + +* `ffae0355 `_: needless + information was removed from the documentation and installation procedure. Core Library ~~~~~~~~~~~~ -* Serialization is still based on :mod:`pickle`. While there is high confidence - remote code execution is impossible in Mitogen's configuration, an untrusted - context may at least trigger disproportionately high memory usage injecting - small messages (*"billion laughs attack"*). Replacement is an important - future priority, but not critical for an initial release. +* `#535 `_: to support function calls + on a service pool from another thread, :class:`mitogen.select.Select` + additionally permits waiting on :class:`mitogen.core.Latch`. -* Child processes are not reliably reaped, leading to a pileup of zombie - processes when a program makes many short-lived connections in a single - invocation. This does not impact Mitogen for Ansible, however it limits the - usefulness of the core library. A future 0.2 release will address it. +* `#535 `_: + :class:`mitogen.service.Pool.defer` allows any function to be enqueued for + the thread pool from another thread. -* Some races remain around :class:`mitogen.core.Broker ` destruction, - disconnection and corresponding file descriptor closure. These are only - problematic in situations where child process reaping is also problematic. +* `#535 `_: a new + :mod:`mitogen.os_fork` module provides a :func:`os.fork` wrapper that pauses + thread activity during fork. On Python<2.6, :class:`mitogen.core.Broker` and + :class:`mitogen.service.Pool` automatically record their existence so that a + :func:`os.fork` monkey-patch can automatically pause them for any attempt to + start a subprocess. -* The `fakessh` component does not shut down correctly and requires flow - control added to the design. While minimal fixes are possible, due to the - absence of flow control the original design is functionally incomplete. +* `ca63c26e `_: + :meth:`mitogen.core.Latch.put`'s `obj` argument was made optional. -* The multi-threaded :ref:`service` remains in a state of design flux and - should be considered obsolete, despite heavy use in Mitogen for Ansible. A - future replacement may be integrated more tightly with, or entirely replace - the RPC dispatcher on the main thread. -* Documentation is in a state of disrepair. This will be improved over the 0.2 - series. +Thanks! +~~~~~~~ + +Mitogen would not be possible without the support of users. A huge thanks for +bug reports, testing, features and fixes in this release contributed by +`Fabian Arrotin `_, +`Giles Westwood `_, +`Matt Layman `_, +`Percy Grunwald `_, +`Petr Enkov `_, +`Tony Finch `_, +`@elbunda `_, and +`@zyphermonkey `_. v0.2.5 (2019-02-14) diff --git a/docs/conf.py b/docs/conf.py index 3708a9438..a6bc2cbc9 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -6,7 +6,7 @@ VERSION = '%s.%s.%s' % mitogen.__version__ author = u'David Wilson' -copyright = u'2018, David Wilson' +copyright = u'2019, David Wilson' exclude_patterns = ['_build'] extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinxcontrib.programoutput'] html_show_sourcelink = False diff --git a/mitogen/__init__.py b/mitogen/__init__.py index 08f875d4c..048798d96 100644 --- a/mitogen/__init__.py +++ b/mitogen/__init__.py @@ -35,7 +35,7 @@ #: Library version as a tuple. -__version__ = (0, 2, 5) +__version__ = (0, 2, 6) #: This is :data:`False` in slave contexts. Previously it was used to prevent diff --git a/mitogen/core.py b/mitogen/core.py index 470b00caa..578337f7c 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -103,6 +103,9 @@ IOLOG.setLevel(logging.INFO) LATIN1_CODEC = encodings.latin_1.Codec() +# str.encode() may take import lock. Deadlock possible if broker calls +# .encode() on behalf of thread currently waiting for module. +UTF8_CODEC = encodings.latin_1.Codec() _v = False _vv = False @@ -137,7 +140,6 @@ except NameError: BaseException = Exception -IS_WSL = 'Microsoft' in os.uname()[2] PY24 = sys.version_info < (2, 5) PY3 = sys.version_info > (3,) if PY3: @@ -161,6 +163,14 @@ except NameError: next = lambda it: it.next() +# #550: prehistoric WSL did not advertise itself in uname output. +try: + fp = open('/proc/sys/kernel/osrelease') + IS_WSL = 'Microsoft' in fp.read() + fp.close() +except IOError: + IS_WSL = False + #: Default size for calls to :meth:`Side.read` or :meth:`Side.write`, and the #: size of buffers configured by :func:`mitogen.parent.create_socketpair`. This @@ -271,9 +281,8 @@ def __init__(self, dct): def __init__(self, dct): for k, v in dct.iteritems(): if type(k) is unicode: - self[k.encode()] = v - else: - self[k] = v + k, _ = UTF8_CODEC.encode(k) + self[k] = v def __repr__(self): return 'Kwargs(%s)' % (dict.__repr__(self),) @@ -735,7 +744,7 @@ def dead(cls, reason=None, **kwargs): """ Syntax helper to construct a dead message. """ - kwargs['data'] = (reason or u'').encode() + kwargs['data'], _ = UTF8_CODEC.encode(reason or u'') return cls(reply_to=IS_DEAD, **kwargs) @classmethod @@ -1092,6 +1101,7 @@ class Importer(object): 'lxd', 'master', 'minify', + 'os_fork', 'parent', 'select', 'service', @@ -1332,7 +1342,7 @@ def load_module(self, fullname): if mod.__package__ and not PY3: # 2.x requires __package__ to be exactly a string. - mod.__package__ = mod.__package__.encode() + mod.__package__, _ = UTF8_CODEC.encode(mod.__package__) source = self.get_source(fullname) try: @@ -1912,6 +1922,8 @@ class Poller(object): Pollers may only be used by one thread at a time. """ + SUPPORTED = True + # This changed from select() to poll() in Mitogen 0.2.4. Since poll() has # no upper FD limit, it is suitable for use with Latch, which must handle # FDs larger than select's limit during many-host runs. We want this @@ -1928,11 +1940,16 @@ class Poller(object): def __init__(self): self._rfds = {} self._wfds = {} - self._pollobj = select.poll() def __repr__(self): return '%s(%#x)' % (type(self).__name__, id(self)) + def _update(self, fd): + """ + Required by PollPoller subclass. + """ + pass + @property def readers(self): """ @@ -1955,20 +1972,6 @@ def close(self): """ pass - _readmask = select.POLLIN | select.POLLHUP - # TODO: no proof we dont need writemask too - - def _update(self, fd): - mask = (((fd in self._rfds) and self._readmask) | - ((fd in self._wfds) and select.POLLOUT)) - if mask: - self._pollobj.register(fd, mask) - else: - try: - self._pollobj.unregister(fd) - except KeyError: - pass - def start_receive(self, fd, data=None): """ Cause :meth:`poll` to yield `data` when `fd` is readable. @@ -2004,22 +2007,27 @@ def stop_transmit(self, fd): self._update(fd) def _poll(self, timeout): + (rfds, wfds, _), _ = io_op(select.select, + self._rfds, + self._wfds, + (), timeout + ) + + for fd in rfds: + _vv and IOLOG.debug('%r: POLLIN for %r', self, fd) + data, gen = self._rfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data + + for fd in wfds: + _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) + data, gen = self._wfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data + if timeout: timeout *= 1000 - events, _ = io_op(self._pollobj.poll, timeout) - for fd, event in events: - if event & self._readmask: - _vv and IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd) - data, gen = self._rfds.get(fd, (None, None)) - if gen and gen < self._generation: - yield data - if event & select.POLLOUT: - _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) - data, gen = self._wfds.get(fd, (None, None)) - if gen and gen < self._generation: - yield data - def poll(self, timeout=None): """ Block the calling thread until one or more FDs are ready for IO. @@ -2053,6 +2061,8 @@ class Latch(object): """ poller_class = Poller + notify = None + # The _cls_ prefixes here are to make it crystal clear in the code which # state mutation isn't covered by :attr:`_lock`. @@ -2142,7 +2152,7 @@ def _get_socketpair(self): return rsock, wsock COOKIE_MAGIC, = struct.unpack('L', b('LTCH') * (struct.calcsize('L')//4)) - COOKIE_FMT = 'Llll' + COOKIE_FMT = '>Qqqq' # #545: id() and get_ident() may exceed long on armhfp. COOKIE_SIZE = struct.calcsize(COOKIE_FMT) def _make_cookie(self): @@ -2242,11 +2252,14 @@ def _get_sleep(self, poller, timeout, block, rsock, wsock, cookie): finally: self._lock.release() - def put(self, obj): + def put(self, obj=None): """ Enqueue an object, waking the first thread waiting for a result, if one exists. + :param obj: + Object to enqueue. Defaults to :data:`None` as a convenience when + using :class:`Latch` only for synchronization. :raises mitogen.core.LatchError: :meth:`close` has been called, and the object is no longer valid. """ @@ -2263,6 +2276,8 @@ def put(self, obj): _vv and IOLOG.debug('%r.put() -> waking wfd=%r', self, wsock.fileno()) self._wake(wsock, cookie) + elif self.notify: + self.notify(self) finally: self._lock.release() @@ -2832,7 +2847,7 @@ class Broker(object): #: before force-disconnecting them during :meth:`shutdown`. shutdown_timeout = 3.0 - def __init__(self, poller_class=None): + def __init__(self, poller_class=None, activate_compat=True): self._alive = True self._exitted = False self._waker = Waker(self) @@ -2850,6 +2865,19 @@ def __init__(self, poller_class=None): name='mitogen.broker' ) self._thread.start() + if activate_compat: + self._py24_25_compat() + + def _py24_25_compat(self): + """ + Python 2.4/2.5 have grave difficulties with threads/fork. We + mandatorily quiesce all running threads during fork using a + monkey-patch there. + """ + if sys.version_info < (2, 6): + # import_module() is used to avoid dep scanner. + os_fork = import_module('mitogen.os_fork') + mitogen.os_fork._notice_broker_or_pool(self) def start_receive(self, stream): """ @@ -2995,6 +3023,7 @@ def _do_broker_main(self): except Exception: LOG.exception('_broker_main() crashed') + self._alive = False # Ensure _alive is consistent on crash. self._exitted = True self._broker_exit() @@ -3198,7 +3227,7 @@ def _setup_master(self): Router.max_message_size = self.config['max_message_size'] if self.config['profiling']: enable_profiling() - self.broker = Broker() + self.broker = Broker(activate_compat=False) self.router = Router(self.broker) self.router.debug = self.config.get('debug', False) self.router.undirectional = self.config['unidirectional'] @@ -3367,6 +3396,7 @@ def main(self): socket.gethostname()) _v and LOG.debug('Recovered sys.executable: %r', sys.executable) + self.broker._py24_25_compat() self.dispatcher.run() _v and LOG.debug('ExternalContext.main() normal exit') except KeyboardInterrupt: diff --git a/mitogen/os_fork.py b/mitogen/os_fork.py new file mode 100644 index 000000000..b27cfd5c3 --- /dev/null +++ b/mitogen/os_fork.py @@ -0,0 +1,183 @@ +# Copyright 2019, David Wilson +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +# !mitogen: minify_safe + +""" +Support for operating in a mixed threading/forking environment. +""" + +import os +import socket +import sys +import weakref + +import mitogen.core + + +# List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this +# list and mitogen.service registers its Pool too. +_brokers = weakref.WeakKeyDictionary() +_pools = weakref.WeakKeyDictionary() + + +def _notice_broker_or_pool(obj): + """ + Used by :mod:`mitogen.core` and :mod:`mitogen.service` to automatically + register every broker and pool on Python 2.4/2.5. + """ + if isinstance(obj, mitogen.core.Broker): + _brokers[obj] = True + else: + _pools[obj] = True + + +def wrap_os__fork(): + corker = Corker( + brokers=list(_brokers), + pools=list(_pools), + ) + try: + corker.cork() + return os__fork() + finally: + corker.uncork() + + +# If Python 2.4/2.5 where threading state is not fixed up, subprocess.Popen() +# may still deadlock due to the broker thread. In this case, pause os.fork() so +# that all active threads are paused during fork. +if sys.version_info < (2, 6): + os__fork = os.fork + os.fork = wrap_os__fork + + +class Corker(object): + """ + Arrange for :class:`mitogen.core.Broker` and optionally + :class:`mitogen.service.Pool` to be temporarily "corked" while fork + operations may occur. + + In a mixed threading/forking environment, it is critical no threads are + active at the moment of fork, as they could hold mutexes whose state is + unrecoverably snapshotted in the locked state in the fork child, causing + deadlocks at random future moments. + + To ensure a target thread has all locks dropped, it is made to write a + large string to a socket with a small buffer that has :data:`os.O_NONBLOCK` + disabled. CPython will drop the GIL and enter the ``write()`` system call, + where it will block until the socket buffer is drained, or the write side + is closed. + + :class:`mitogen.core.Poller` is used to ensure the thread really has + blocked outside any Python locks, by checking if the socket buffer has + started to fill. + + Since this necessarily involves posting a message to every existent thread + and verifying acknowledgement, it will never be a fast operation. + + This does not yet handle the case of corking being initiated from within a + thread that is also a cork target. + + :param brokers: + Sequence of :class:`mitogen.core.Broker` instances to cork. + :param pools: + Sequence of :class:`mitogen.core.Pool` instances to cork. + """ + def __init__(self, brokers=(), pools=()): + self.brokers = brokers + self.pools = pools + + def _do_cork(self, s, wsock): + try: + try: + while True: + # at least EINTR is possible. Do our best to keep handling + # outside the GIL in this case using sendall(). + wsock.sendall(s) + except socket.error: + pass + finally: + wsock.close() + + def _cork_one(self, s, obj): + """ + Construct a socketpair, saving one side of it, and passing the other to + `obj` to be written to by one of its threads. + """ + rsock, wsock = mitogen.parent.create_socketpair(size=4096) + mitogen.core.set_cloexec(rsock.fileno()) + mitogen.core.set_cloexec(wsock.fileno()) + mitogen.core.set_block(wsock) # gevent + self._rsocks.append(rsock) + obj.defer(self._do_cork, s, wsock) + + def _verify_one(self, rsock): + """ + Pause until the socket `rsock` indicates readability, due to + :meth:`_do_cork` triggering a blocking write on another thread. + """ + poller = mitogen.core.Poller() + poller.start_receive(rsock.fileno()) + try: + while True: + for fd in poller.poll(): + return + finally: + poller.close() + + def cork(self): + """ + Arrange for any associated brokers and pools to be paused with no locks + held. This will not return until each thread acknowledges it has ceased + execution. + """ + s = mitogen.core.b('CORK') * ((128 // 4) * 1024) + self._rsocks = [] + + # Pools must be paused first, as existing work may require the + # participation of a broker in order to complete. + for pool in self.pools: + if not pool.closed: + for x in range(pool.size): + self._cork_one(s, pool) + + for broker in self.brokers: + if broker._alive: + self._cork_one(s, broker) + + # Pause until we can detect every thread has entered write(). + for rsock in self._rsocks: + self._verify_one(rsock) + + def uncork(self): + """ + Arrange for paused threads to resume operation. + """ + for rsock in self._rsocks: + rsock.close() diff --git a/mitogen/parent.py b/mitogen/parent.py index 7e567aaae..3d02bc43e 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -253,7 +253,7 @@ def close_nonstandard_fds(): pass -def create_socketpair(): +def create_socketpair(size=None): """ Create a :func:`socket.socketpair` to use for use as a child process's UNIX stdio channels. As socket pairs are bidirectional, they are economical on @@ -265,10 +265,10 @@ def create_socketpair(): parentfp, childfp = socket.socketpair() parentfp.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, - mitogen.core.CHUNK_SIZE) + size or mitogen.core.CHUNK_SIZE) childfp.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, - mitogen.core.CHUNK_SIZE) + size or mitogen.core.CHUNK_SIZE) return parentfp, childfp @@ -371,11 +371,12 @@ def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None): def _acquire_controlling_tty(): os.setsid() - if sys.platform == 'linux2': + if sys.platform in ('linux', 'linux2'): # On Linux, the controlling tty becomes the first tty opened by a # process lacking any prior tty. os.close(os.open(os.ttyname(2), os.O_RDWR)) - if hasattr(termios, 'TIOCSCTTY'): + if hasattr(termios, 'TIOCSCTTY') and not mitogen.core.IS_WSL: + # #550: prehistoric WSL does not like TIOCSCTTY. # On BSD an explicit ioctl is required. For some inexplicable reason, # Python 2.6 on Travis also requires it. fcntl.ioctl(2, termios.TIOCSCTTY) @@ -890,10 +891,58 @@ def __repr__(self): ) +class PollPoller(mitogen.core.Poller): + """ + Poller based on the POSIX poll(2) interface. Not available on some versions + of OS X, otherwise it is the preferred poller for small FD counts. + """ + SUPPORTED = hasattr(select, 'poll') + _repr = 'PollPoller()' + + def __init__(self): + super(PollPoller, self).__init__() + self._pollobj = select.poll() + + # TODO: no proof we dont need writemask too + _readmask = ( + getattr(select, 'POLLIN', 0) | + getattr(select, 'POLLHUP', 0) + ) + + def _update(self, fd): + mask = (((fd in self._rfds) and self._readmask) | + ((fd in self._wfds) and select.POLLOUT)) + if mask: + self._pollobj.register(fd, mask) + else: + try: + self._pollobj.unregister(fd) + except KeyError: + pass + + def _poll(self, timeout): + if timeout: + timeout *= 1000 + + events, _ = mitogen.core.io_op(self._pollobj.poll, timeout) + for fd, event in events: + if event & self._readmask: + IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd) + data, gen = self._rfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data + if event & select.POLLOUT: + IOLOG.debug('%r: POLLOUT for %r', self, fd) + data, gen = self._wfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data + + class KqueuePoller(mitogen.core.Poller): """ Poller based on the FreeBSD/Darwin kqueue(2) interface. """ + SUPPORTED = hasattr(select, 'kqueue') _repr = 'KqueuePoller()' def __init__(self): @@ -971,6 +1020,7 @@ class EpollPoller(mitogen.core.Poller): """ Poller based on the Linux epoll(2) interface. """ + SUPPORTED = hasattr(select, 'epoll') _repr = 'EpollPoller()' def __init__(self): @@ -1041,20 +1091,18 @@ def _poll(self, timeout): yield data -if sys.version_info < (2, 6): - # 2.4 and 2.5 only had select.select() and select.poll(). - POLLER_BY_SYSNAME = {} -else: - POLLER_BY_SYSNAME = { - 'Darwin': KqueuePoller, - 'FreeBSD': KqueuePoller, - 'Linux': EpollPoller, - } +# 2.4 and 2.5 only had select.select() and select.poll(). +for _klass in mitogen.core.Poller, PollPoller, KqueuePoller, EpollPoller: + if _klass.SUPPORTED: + PREFERRED_POLLER = _klass -PREFERRED_POLLER = POLLER_BY_SYSNAME.get( - os.uname()[0], - mitogen.core.Poller, -) +# For apps that start threads dynamically, it's possible Latch will also get +# very high-numbered wait fds when there are many connections, and so select() +# becomes useless there too. So swap in our favourite poller. +if PollPoller.SUPPORTED: + mitogen.core.Latch.poller_class = PollPoller +else: + mitogen.core.Latch.poller_class = PREFERRED_POLLER class DiagLogStream(mitogen.core.BasicStream): diff --git a/mitogen/select.py b/mitogen/select.py index fd2cbe9ae..51aebc227 100644 --- a/mitogen/select.py +++ b/mitogen/select.py @@ -35,12 +35,25 @@ class Error(mitogen.core.Error): pass +class Event(object): + """ + Represents one selected event. + """ + #: The first Receiver or Latch the event traversed. + source = None + + #: The :class:`mitogen.core.Message` delivered to a receiver, or the object + #: posted to a latch. + data = None + + class Select(object): """ Support scatter/gather asynchronous calls and waiting on multiple - receivers, channels, and sub-Selects. Accepts a sequence of - :class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` instances - and returns the first value posted to any receiver or select. + :class:`receivers `, + :class:`channels `, + :class:`latches `, and + :class:`sub-selects