Skip to content
This repository has been archived by the owner on Mar 22, 2018. It is now read-only.

Commit

Permalink
ContainerManagerService (#35)
Browse files Browse the repository at this point in the history
* storage: StorageService no longer handles container work.

* containermgr-service: Added the new container maanger service.

This service owns container management functions such as registering a
node to a container manager. It follows a similar mechanic to the
StorageService in that it is configurable to work with different backend
systems using a handler plugin pattern.

* containermgr-service: Build and install additions.

* fixup! containermgr-service: Added the new container maanger service.
  • Loading branch information
ashcrow authored and mbarnes committed Dec 8, 2016
1 parent b570f53 commit 5be6096
Show file tree
Hide file tree
Showing 8 changed files with 601 additions and 37 deletions.
7 changes: 7 additions & 0 deletions conf/containermgr.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"container_handlers": [{
"name": "OpenshiftA1",
"handler": "commissaire.containermgr.kubernetes",
"server_url": "http://192.168.152.102:8080/"
}]
}
12 changes: 12 additions & 0 deletions conf/systemd/commissaire-containermgr.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[Unit]
Description=Commissaire Container Manager Service
Documentation=https://commissaire.readthedocs.io/
After=network.target

[Service]
ExecStart=/usr/bin/commissaire-containermgr-service -c /etc/commisasire/containermgr.conf
PIDFile=/var/run/commissaire-containermgr-service.pid
Type=simple

[Install]
WantedBy=multi-user.target
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def extract_requirements(filename):
'commissaire_service.storage:main'),
('commissaire-clusterexec-service = '
'commissaire_service.clusterexec:main'),
('commissaire-containermgr-service = '
'commissaire_service.containermgr:main'),
('commissaire-investigator-service = '
'commissaire_service.investigator:main'),
('commissaire-watcher-service = '
Expand Down
256 changes: 256 additions & 0 deletions src/commissaire_service/containermgr/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
# Copyright (C) 2016 Red Hat, Inc
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import importlib

from commissaire.containermgr import ContainerManagerError
from commissaire.util.config import ConfigurationError, read_config_file

from commissaire_service.service import CommissaireService
from commissaire_service.containermgr.containerhandlermanager import (
ContainerHandlerManager)


class ContainerManagerService(CommissaireService):
"""
Provides access to Container Managers.
"""

def __init__(self, exchange_name, connection_url, config_file=None):
"""
Creates a new ContainerManagerService and sets up ContainerHandler
instances according to the config_file. If config_file is omitted,
it will try the default location (/etc/commissaire/commissaire.conf).
:param exchange_name: Name of the topic exchange
:type exchange_name: str
:param connection_url: Kombu connection URL
:type connection_url: str
:param config_file: Optional configuration file path
:type config_file: str or None
"""
queue_kwargs = [{
'name': 'containermgr',
'routing_key': 'container.*',
'exclusive': False,
}]
super().__init__(exchange_name, connection_url, queue_kwargs)
self._manager = ContainerHandlerManager()

config_data = read_config_file(config_file)
container_handlers = config_data.get('container_handlers', [])

if len(container_handlers) == 0:
self.logger.info('No ContainerManagerHandlers were provided.')
for config in container_handlers:
self.register(config)

def register(self, config):
"""
Registers a new container handler type after extracting and validating
information required for registration from the configuration data.
:param config: A configuration dictionary
:type config: dict
"""
if type(config) is not dict:
raise ConfigurationError(
'Store handler format must be a JSON object, got a '
'{} instead: {}'.format(type(config).__name__, config))

# Import the handler class.
try:
module_name = config.pop('handler')
except KeyError as error:
raise ConfigurationError(
'Container handler configuration missing "{}" key: '
'{}'.format(error, config))
try:
module = importlib.import_module(module_name)
handler_type = getattr(module, 'ContainerHandler')
except ImportError:
raise ConfigurationError(
'Invalid container handler module name: {}'.format(
module_name))

self._manager.register(handler_type, config)

def on_list_handlers(self, message):
"""
Handler for the "container.list_handlers" routing key.
Returns a list of registered container handlers as dictionaries.
Each dictionary contains the following:
'name' : The name of the container handler
'handler_type' : Type type of the container handler
'config' : Dictionary of configuration values
:param message: A message instance
:type message: kombu.message.Message
"""
result = []
for name, handler in self._manager.handlers.items():
result.append({
'name': name,
'handler_type': handler.__class__.__name__,
})
return result

def on_node_registered(self, message, container_handler_name, address):
"""
Checks if a node is registered to a specific container manager.
:param message: A message instance
:type message: kombu.message.Message
:param container_handler_name: Name of the ContainerHandler to use.
:type container_handler_name: str
:param address: Address of the node
:type address: str
:returns: Whether the node is registered
:rtype: bool
"""
return self._node_operation(
container_handler_name, 'node_registered', address)

def on_register_node(self, message, container_handler_name, address):
"""
Registers a node to a container manager.
:param message: A message instance
:type message: kombu.message.Message
:param container_handler_name: Name of the ContainerHandler to use.
:type container_handler_name: str
:param address: Address of the node
:type address: str
:returns: Whether the node is registered
:rtype: bool
"""
return self._node_operation(
container_handler_name, 'register_node', address)

def on_remove_node(self, message, container_handler_name, address):
"""
Removes a node from a container manager.
:param message: A message instance
:type message: kombu.message.Message
:param container_handler_name: Name of the ContainerHandler to use.
:type container_handler_name: str
:param address: Address of the node
:type address: str
:returns: Whether the node is registered
:rtype: bool
"""
return self._node_operation(
container_handler_name, 'remove_node', address)

def _node_operation(self, container_handler_name, method, address):
"""
Common code for getting node information.
:param container_handler_name: Name of the ContainerHandler to use.
:type container_handler_name: str
:param method: The containermgr method to call.
:type method: str
:param address: Address of the node
:type address: str
:returns: Whether the node is registered
:rtype: bool
"""
try:
container_handler = self._manager.handlers[container_handler_name]
result = getattr(container_handler, method).__call__(address)

self.logger.info(
'{} called for {} via the container manager {}'.format(
method, address, container_handler_name))
self.logger.debug('Result: {}'.format(result))

if bool(result):
return result

except ContainerManagerError as error:
self.logger.info('{} raised ContainerManagerError: {}'.format(
error))
except KeyError:
self.logger.error('ContainerHandler {} does not exist.'.format(
container_handler_name))
except Exception as error:
self.logger.error(
'Unexpected error while attempting {} for node "{}" with '
'containermgr "{}". {}: {}'.format(
method, address, container_handler_name,
error.__class__.__name__, error))

return False

def on_get_node_status(self, message, container_handler_name, address):
"""
Gets a nodes status from the container manager.
:param message: A message instance
:type message: kombu.message.Message
:param container_handler_name: Name of the ContainerHandler to use.
:type container_handler_name: str
:param address: Address of the node
:type address: str
:returns: Status of the node according to the container manager.
:rtype: dict
"""
result = self._node_operation(
container_handler_name, 'get_node_status', address)
if result is False:
error = 'No status available for node {}'.format(address)
self.logger.error(result)
raise Exception(error)
return result


def main(): # pragma: no cover
"""
Main entry point.
"""
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
'-c', '--config', type=str,
help='Configuration file to use.')
parser.add_argument(
'--bus-exchange', type=str, default='commissaire',
help='Message bus exchange name.')
parser.add_argument(
'--bus-uri', type=str, metavar='BUS_URI',
default='redis://127.0.0.1:6379/', # FIXME: Remove before release
help=(
'Message bus connection URI. See:'
'http://kombu.readthedocs.io/en/latest/userguide/connections.html')
)

args = parser.parse_args()

try:
service = ContainerManagerService(
exchange_name=args.bus_exchange,
connection_url=args.bus_uri,
config_file=args.config)
service.run()
except KeyboardInterrupt:
pass


if __name__ == '__main__': # pragma: no cover
main()
57 changes: 57 additions & 0 deletions src/commissaire_service/containermgr/containerhandlermanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (C) 2016 Red Hat, Inc
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging


class ContainerHandlerManager(object):
"""
Configures ContainerHandler instances and routes storage requests to
the appropriate handler.
"""

def __init__(self):
"""
Creates a new ContainerHandlerManager instance.
"""
self._handlers = {}
self.logger = logging.getLogger('containermgr')
self.logger.setLevel(logging.DEBUG)

def register(self, handler_type, config):
"""
Registers a ContainerHandler for use in remote calls.
:param handler_type: A class derived from ContainerHandler
:type handler_type: type
:param config: Configuration parameters for the handler
:type config: dict
"""
handler_type.check_config(config)
self._handlers[config['name']] = handler_type(config)
self.logger.info('Registered container handler {}'.format(
config['name']))
self.logger.debug('{}: {}'.format(
self._handlers[config['name']], config))

@property
def handlers(self):
"""
Returns all configured container manager instances.
:returns: dict of container managers
:rtype: dict
"""
return self._handlers
37 changes: 0 additions & 37 deletions src/commissaire_service/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import importlib
import json

from time import sleep

import commissaire.models as models

from commissaire import constants as C
Expand Down Expand Up @@ -230,41 +228,6 @@ def on_list_store_handlers(self, message):
})
return result

def on_node_registered(self, message, cluster_type, address):
"""
Checks if a cluster node at the given address is registered on a
cluster of the given type. This method may take several seconds
to complete if the cluster node is unresponsive, as it retries a
few times with a sleep delay.
:param message: A message instance
:type message: kombu.message.Message
:param cluster_type: A cluster type constant
:type cluster_type: str
:param address: Address of the cluster node
:type address: str
:returns: Whether the node is registered
:rtype: bool
"""
for con_mgr in self._manager.list_container_managers(cluster_type):
# Try 3 times waiting 5 seconds each time before giving up.
for attempt in range(3):
if con_mgr.node_registered(address):
self.logger.info(
'{} has been registered with the '
'container manager'.format(address))
return True
if attempt == 2:
self.logger.warn(
'Could not register with the container manager')
return False
self.logger.debug(
'{} has not been registered with the container '
'manager. Checking again in 5 seconds...'.format(
address))
sleep(5)
return False


def main(): # pragma: no cover
"""
Expand Down
Loading

0 comments on commit 5be6096

Please sign in to comment.