From 5be609693c03bb4e3840c848ed2d4dda8b86d6ea Mon Sep 17 00:00:00 2001 From: Stephen Milner Date: Thu, 8 Dec 2016 12:31:24 -0500 Subject: [PATCH] ContainerManagerService (#35) * storage: StorageService no longer handles container work. * containermgr-service: Added the new container maanger service. This service owns container management functions such as registering a node to a container manager. It follows a similar mechanic to the StorageService in that it is configurable to work with different backend systems using a handler plugin pattern. * containermgr-service: Build and install additions. * fixup! containermgr-service: Added the new container maanger service. --- conf/containermgr.conf | 7 + conf/systemd/commissaire-containermgr.service | 12 + setup.py | 2 + .../containermgr/__init__.py | 256 ++++++++++++++++++ .../containermgr/containerhandlermanager.py | 57 ++++ src/commissaire_service/storage/__init__.py | 37 --- ...st_containermgr_containerhandlermanager.py | 62 +++++ test/test_service_containermgr.py | 205 ++++++++++++++ 8 files changed, 601 insertions(+), 37 deletions(-) create mode 100644 conf/containermgr.conf create mode 100644 conf/systemd/commissaire-containermgr.service create mode 100644 src/commissaire_service/containermgr/__init__.py create mode 100644 src/commissaire_service/containermgr/containerhandlermanager.py create mode 100644 test/test_containermgr_containerhandlermanager.py create mode 100644 test/test_service_containermgr.py diff --git a/conf/containermgr.conf b/conf/containermgr.conf new file mode 100644 index 0000000..d2dec62 --- /dev/null +++ b/conf/containermgr.conf @@ -0,0 +1,7 @@ +{ + "container_handlers": [{ + "name": "OpenshiftA1", + "handler": "commissaire.containermgr.kubernetes", + "server_url": "http://192.168.152.102:8080/" + }] +} diff --git a/conf/systemd/commissaire-containermgr.service b/conf/systemd/commissaire-containermgr.service new file mode 100644 index 0000000..1db3141 --- /dev/null +++ b/conf/systemd/commissaire-containermgr.service @@ -0,0 +1,12 @@ +[Unit] +Description=Commissaire Container Manager Service +Documentation=https://commissaire.readthedocs.io/ +After=network.target + +[Service] +ExecStart=/usr/bin/commissaire-containermgr-service -c /etc/commisasire/containermgr.conf +PIDFile=/var/run/commissaire-containermgr-service.pid +Type=simple + +[Install] +WantedBy=multi-user.target diff --git a/setup.py b/setup.py index aad0919..1fa487c 100755 --- a/setup.py +++ b/setup.py @@ -68,6 +68,8 @@ def extract_requirements(filename): 'commissaire_service.storage:main'), ('commissaire-clusterexec-service = ' 'commissaire_service.clusterexec:main'), + ('commissaire-containermgr-service = ' + 'commissaire_service.containermgr:main'), ('commissaire-investigator-service = ' 'commissaire_service.investigator:main'), ('commissaire-watcher-service = ' diff --git a/src/commissaire_service/containermgr/__init__.py b/src/commissaire_service/containermgr/__init__.py new file mode 100644 index 0000000..b2b9041 --- /dev/null +++ b/src/commissaire_service/containermgr/__init__.py @@ -0,0 +1,256 @@ +# Copyright (C) 2016 Red Hat, Inc +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import importlib + +from commissaire.containermgr import ContainerManagerError +from commissaire.util.config import ConfigurationError, read_config_file + +from commissaire_service.service import CommissaireService +from commissaire_service.containermgr.containerhandlermanager import ( + ContainerHandlerManager) + + +class ContainerManagerService(CommissaireService): + """ + Provides access to Container Managers. + """ + + def __init__(self, exchange_name, connection_url, config_file=None): + """ + Creates a new ContainerManagerService and sets up ContainerHandler + instances according to the config_file. If config_file is omitted, + it will try the default location (/etc/commissaire/commissaire.conf). + + :param exchange_name: Name of the topic exchange + :type exchange_name: str + :param connection_url: Kombu connection URL + :type connection_url: str + :param config_file: Optional configuration file path + :type config_file: str or None + """ + queue_kwargs = [{ + 'name': 'containermgr', + 'routing_key': 'container.*', + 'exclusive': False, + }] + super().__init__(exchange_name, connection_url, queue_kwargs) + self._manager = ContainerHandlerManager() + + config_data = read_config_file(config_file) + container_handlers = config_data.get('container_handlers', []) + + if len(container_handlers) == 0: + self.logger.info('No ContainerManagerHandlers were provided.') + for config in container_handlers: + self.register(config) + + def register(self, config): + """ + Registers a new container handler type after extracting and validating + information required for registration from the configuration data. + + :param config: A configuration dictionary + :type config: dict + """ + if type(config) is not dict: + raise ConfigurationError( + 'Store handler format must be a JSON object, got a ' + '{} instead: {}'.format(type(config).__name__, config)) + + # Import the handler class. + try: + module_name = config.pop('handler') + except KeyError as error: + raise ConfigurationError( + 'Container handler configuration missing "{}" key: ' + '{}'.format(error, config)) + try: + module = importlib.import_module(module_name) + handler_type = getattr(module, 'ContainerHandler') + except ImportError: + raise ConfigurationError( + 'Invalid container handler module name: {}'.format( + module_name)) + + self._manager.register(handler_type, config) + + def on_list_handlers(self, message): + """ + Handler for the "container.list_handlers" routing key. + + Returns a list of registered container handlers as dictionaries. + Each dictionary contains the following: + + 'name' : The name of the container handler + 'handler_type' : Type type of the container handler + 'config' : Dictionary of configuration values + + :param message: A message instance + :type message: kombu.message.Message + """ + result = [] + for name, handler in self._manager.handlers.items(): + result.append({ + 'name': name, + 'handler_type': handler.__class__.__name__, + }) + return result + + def on_node_registered(self, message, container_handler_name, address): + """ + Checks if a node is registered to a specific container manager. + + :param message: A message instance + :type message: kombu.message.Message + :param container_handler_name: Name of the ContainerHandler to use. + :type container_handler_name: str + :param address: Address of the node + :type address: str + :returns: Whether the node is registered + :rtype: bool + """ + return self._node_operation( + container_handler_name, 'node_registered', address) + + def on_register_node(self, message, container_handler_name, address): + """ + Registers a node to a container manager. + + :param message: A message instance + :type message: kombu.message.Message + :param container_handler_name: Name of the ContainerHandler to use. + :type container_handler_name: str + :param address: Address of the node + :type address: str + :returns: Whether the node is registered + :rtype: bool + """ + return self._node_operation( + container_handler_name, 'register_node', address) + + def on_remove_node(self, message, container_handler_name, address): + """ + Removes a node from a container manager. + + :param message: A message instance + :type message: kombu.message.Message + :param container_handler_name: Name of the ContainerHandler to use. + :type container_handler_name: str + :param address: Address of the node + :type address: str + :returns: Whether the node is registered + :rtype: bool + """ + return self._node_operation( + container_handler_name, 'remove_node', address) + + def _node_operation(self, container_handler_name, method, address): + """ + Common code for getting node information. + + :param container_handler_name: Name of the ContainerHandler to use. + :type container_handler_name: str + :param method: The containermgr method to call. + :type method: str + :param address: Address of the node + :type address: str + :returns: Whether the node is registered + :rtype: bool + """ + try: + container_handler = self._manager.handlers[container_handler_name] + result = getattr(container_handler, method).__call__(address) + + self.logger.info( + '{} called for {} via the container manager {}'.format( + method, address, container_handler_name)) + self.logger.debug('Result: {}'.format(result)) + + if bool(result): + return result + + except ContainerManagerError as error: + self.logger.info('{} raised ContainerManagerError: {}'.format( + error)) + except KeyError: + self.logger.error('ContainerHandler {} does not exist.'.format( + container_handler_name)) + except Exception as error: + self.logger.error( + 'Unexpected error while attempting {} for node "{}" with ' + 'containermgr "{}". {}: {}'.format( + method, address, container_handler_name, + error.__class__.__name__, error)) + + return False + + def on_get_node_status(self, message, container_handler_name, address): + """ + Gets a nodes status from the container manager. + + :param message: A message instance + :type message: kombu.message.Message + :param container_handler_name: Name of the ContainerHandler to use. + :type container_handler_name: str + :param address: Address of the node + :type address: str + :returns: Status of the node according to the container manager. + :rtype: dict + """ + result = self._node_operation( + container_handler_name, 'get_node_status', address) + if result is False: + error = 'No status available for node {}'.format(address) + self.logger.error(result) + raise Exception(error) + return result + + +def main(): # pragma: no cover + """ + Main entry point. + """ + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + '-c', '--config', type=str, + help='Configuration file to use.') + parser.add_argument( + '--bus-exchange', type=str, default='commissaire', + help='Message bus exchange name.') + parser.add_argument( + '--bus-uri', type=str, metavar='BUS_URI', + default='redis://127.0.0.1:6379/', # FIXME: Remove before release + help=( + 'Message bus connection URI. See:' + 'http://kombu.readthedocs.io/en/latest/userguide/connections.html') + ) + + args = parser.parse_args() + + try: + service = ContainerManagerService( + exchange_name=args.bus_exchange, + connection_url=args.bus_uri, + config_file=args.config) + service.run() + except KeyboardInterrupt: + pass + + +if __name__ == '__main__': # pragma: no cover + main() diff --git a/src/commissaire_service/containermgr/containerhandlermanager.py b/src/commissaire_service/containermgr/containerhandlermanager.py new file mode 100644 index 0000000..fcfcaef --- /dev/null +++ b/src/commissaire_service/containermgr/containerhandlermanager.py @@ -0,0 +1,57 @@ +# Copyright (C) 2016 Red Hat, Inc +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging + + +class ContainerHandlerManager(object): + """ + Configures ContainerHandler instances and routes storage requests to + the appropriate handler. + """ + + def __init__(self): + """ + Creates a new ContainerHandlerManager instance. + """ + self._handlers = {} + self.logger = logging.getLogger('containermgr') + self.logger.setLevel(logging.DEBUG) + + def register(self, handler_type, config): + """ + Registers a ContainerHandler for use in remote calls. + + :param handler_type: A class derived from ContainerHandler + :type handler_type: type + :param config: Configuration parameters for the handler + :type config: dict + """ + handler_type.check_config(config) + self._handlers[config['name']] = handler_type(config) + self.logger.info('Registered container handler {}'.format( + config['name'])) + self.logger.debug('{}: {}'.format( + self._handlers[config['name']], config)) + + @property + def handlers(self): + """ + Returns all configured container manager instances. + + :returns: dict of container managers + :rtype: dict + """ + return self._handlers diff --git a/src/commissaire_service/storage/__init__.py b/src/commissaire_service/storage/__init__.py index 2f2854a..cf0833b 100644 --- a/src/commissaire_service/storage/__init__.py +++ b/src/commissaire_service/storage/__init__.py @@ -17,8 +17,6 @@ import importlib import json -from time import sleep - import commissaire.models as models from commissaire import constants as C @@ -230,41 +228,6 @@ def on_list_store_handlers(self, message): }) return result - def on_node_registered(self, message, cluster_type, address): - """ - Checks if a cluster node at the given address is registered on a - cluster of the given type. This method may take several seconds - to complete if the cluster node is unresponsive, as it retries a - few times with a sleep delay. - - :param message: A message instance - :type message: kombu.message.Message - :param cluster_type: A cluster type constant - :type cluster_type: str - :param address: Address of the cluster node - :type address: str - :returns: Whether the node is registered - :rtype: bool - """ - for con_mgr in self._manager.list_container_managers(cluster_type): - # Try 3 times waiting 5 seconds each time before giving up. - for attempt in range(3): - if con_mgr.node_registered(address): - self.logger.info( - '{} has been registered with the ' - 'container manager'.format(address)) - return True - if attempt == 2: - self.logger.warn( - 'Could not register with the container manager') - return False - self.logger.debug( - '{} has not been registered with the container ' - 'manager. Checking again in 5 seconds...'.format( - address)) - sleep(5) - return False - def main(): # pragma: no cover """ diff --git a/test/test_containermgr_containerhandlermanager.py b/test/test_containermgr_containerhandlermanager.py new file mode 100644 index 0000000..dd61638 --- /dev/null +++ b/test/test_containermgr_containerhandlermanager.py @@ -0,0 +1,62 @@ +# Copyright (C) 2016 Red Hat, Inc +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for commissaire_service.containermgr.ContainerHandlerManager class. +""" + +import logging + +from . import TestCase, mock +from commissaire.util.config import ConfigurationError +from commissaire.containermgr.kubernetes import KubeContainerManager +from commissaire_service.containermgr.containerhandlermanager import ( + ContainerHandlerManager) + + +class TestContainerHandlerManager(TestCase): + """ + Tests for the ContainerHandlerManager class. + """ + + def setUp(self): + """ + Set up before each test. + """ + self.instance = ContainerHandlerManager() + + def test_initialization(self): + """ + Verify ContainerHandlerManager initializes as expected. + """ + self.assertEquals({}, self.instance.handlers) + self.assertTrue(isinstance(self.instance.logger, logging.Logger)) + + def test_register_with_valid_type(self): + """ + Verify ContainerHandlerManager.register successfully registers ContainerHandlers. + """ + self.instance.register( + KubeContainerManager, + config={'name': 'test', 'server_url': 'http://127.0.0.1:8080/'} + ) + + def test_register_with_invalid_type(self): + """ + Verify ContainerHandlerManager.register fails with invalid types. + """ + self.assertRaises( + ConfigurationError, + self.instance.register, + KubeContainerManager, {}) diff --git a/test/test_service_containermgr.py b/test/test_service_containermgr.py new file mode 100644 index 0000000..1271cd7 --- /dev/null +++ b/test/test_service_containermgr.py @@ -0,0 +1,205 @@ +# Copyright (C) 2016 Red Hat, Inc +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for commissaire_service.service.containermgr.ContainerManagerService. +""" + +from . import TestCase, mock + +from commissaire_service.containermgr import ContainerManagerService +from commissaire.containermgr.kubernetes import ContainerHandler + + +class TestContainerManagerService(TestCase): + """ + Tests for the ContainerManagerService class. + """ + + def setUp(self): + """ + Called before each test case. + """ + self._connection_patcher = mock.patch( + 'commissaire_service.service.Connection') + self._exchange_patcher = mock.patch( + 'commissaire_service.service.Exchange') + self._producer_patcher = mock.patch( + 'commissaire_service.service.Producer') + self._connection = self._connection_patcher.start() + self._exchange = self._exchange_patcher.start() + self._producer = self._producer_patcher.start() + + self.queue_kwargs = [ + {'name': 'simple', 'routing_key': 'simple.*'}, + ] + + self.service_instance = ContainerManagerService( + 'commissaire', + 'redis://127.0.0.1:6379/' + ) + + def tearDown(self): + """ + Called after each test case. + """ + self._connection.stop() + self._exchange.stop() + self._producer.stop() + + def test_register(self): + """ + Verify ContainerManagerService.register can properly register handlers. + """ + name = 'test' + self.service_instance.register({ + 'name': name, + 'handler':'commissaire.containermgr.kubernetes', + 'server_url': 'https://127.0.0.1:8080/' + }) + # There should be 1 handler of the imported ContainerHandler type + self.assertEquals(1, len(self.service_instance._manager.handlers)) + self.assertIn(name, self.service_instance._manager.handlers) + self.assertIsInstance( + self.service_instance._manager.handlers[name], + ContainerHandler) + + def test_on_list_handler_when_empty(self): + """ + Verify ContainerManagerService.on_list_handlers returns an empty list by default. + """ + message = mock.MagicMock( + payload='', + delivery_info={ + 'routing_key': 'container.list_handlers'}) + + result = self.service_instance.on_list_handlers(message) + # There should be no handlers by default + self.assertEquals([], result) + + def test_on_list_handler_with_handler(self): + """ + Verify ContainerManagerService.on_list_handlers returns a handler when one has been registered. + """ + self.service_instance._manager._handlers = { + 'test': ContainerHandler(config={ + 'server_url':'https://127.0.0.1:8080/'})} + + message = mock.MagicMock( + payload='', + delivery_info={ + 'routing_key': 'container.list_handlers'}) + + result = self.service_instance.on_list_handlers(message) + # There should be no handlers by default + self.assertEquals([{ + 'name': 'test', 'handler_type': 'KubeContainerManager'}], result) + + def test_on_node_registered(self): + """ + Verify ContainerManagerService.on_node_registered returns proper data. + """ + message = mock.MagicMock( + payload='', + delivery_info={ + 'routing_key': 'container.node_registered'}) + + for code, result in ((200, True), (404, False)): + ch = mock.MagicMock() + ch.node_registered.return_value = result + self.service_instance._manager._handlers = {'test': ch} + + self.assertEquals( + result, + self.service_instance.on_node_registered( + message, 'test', '127.0.0.1')) + + def test_on_register_node_and_remove_node(self): + """ + Verify on_register/remove_node responds properly. + """ + for method in ('register_node', 'remove_node'): + message = mock.MagicMock( + payload='', + delivery_info={ + 'routing_key': 'container.{}'.format(method)}) + + for code, result in ((201, True), (404, False)): + ch = mock.MagicMock() + getattr(ch, method).return_value = result + self.service_instance._manager._handlers = {'test': ch} + + self.assertEquals( + result, + getattr(self.service_instance, 'on_{}'.format(method))( + message, 'test', '127.0.01')) + + def test_on_register_node_and_remove_node_with_exceptions(self): + """ + Verify on_register/remove_node handle exceptions. + """ + for method in ('register_node', 'remove_node'): + message = mock.MagicMock( + payload='', + delivery_info={ + 'routing_key': 'container.{}'.format(method)}) + + for exc in (KeyError, Exception): + # XXX: This isn't the exact place the exceptions would be + # raised, but it is in the correct block + ch = mock.MagicMock() + getattr(ch, method).side_effect = exc + self.service_instance._manager._handlers = {'test': ch} + + self.assertEquals( + False, + getattr(self.service_instance, 'on_{}'.format(method))( + message, 'test', '127.0.01')) + + def test_on_get_node_status(self): + """ + Verify ContainerManagerService.get_node_status returns proper data on success. + """ + message = mock.MagicMock( + payload='', + delivery_info={ + 'routing_key': 'container.get_node_status'}) + + expected = {'test': 'test'} + ch = mock.MagicMock() + ch.get_node_status.return_value = expected + self.service_instance._manager._handlers = {'test': ch} + + self.assertEquals( + expected, + self.service_instance.on_get_node_status( + message, 'test', '127.0.0.1')) + + def test_on_get_node_status_with_failure(self): + """ + Verify ContainerManagerService.get_node_status returns proper data on failure. + """ + message = mock.MagicMock( + payload='', + delivery_info={ + 'routing_key': 'container.get_node_status'}) + + ch = mock.MagicMock() + ch.get_node_status.side_effect = Exception + self.service_instance._manager._handlers = {'test': ch} + + self.assertRaises( + Exception, + self.service_instance.on_get_node_status, + message, 'test', '127.0.0.1')