Skip to content
This repository has been archived by the owner on Mar 22, 2018. It is now read-only.

Commit

Permalink
Watcher Port (#25)
Browse files Browse the repository at this point in the history
* ansible: Use entire inventory for availability check.

* CommissaireService simplification.

- _wrap_on_message has been renamed to on_message
- on_message handles non-rpc messages as it did before
- Minor logging improvements

* Added watcher service.

The watcher service is a port of the original MVP watcher job but taking
advantage of the global queuing system we now have.

* test: Disable ansible code test coverage.

Re-enable at a later date.
  • Loading branch information
ashcrow authored and mbarnes committed Oct 24, 2016
1 parent 9519800 commit f6852e2
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 36 deletions.
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def extract_requirements(filename):
'commissaire_service.storage:main'),
('commissaire-investigator-service = '
'commissaire_service.investigator:main'),
('commissaire-watcher-service = '
'commissaire_service.watcher:main'),

],
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
---
- hosts: commissaire_targets
- hosts: all
name: Check Host Availability
gather_facts: no
tasks:
Expand Down
32 changes: 11 additions & 21 deletions src/commissaire_service/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def run(self):

class CommissaireService(ConsumerMixin, BusMixin):
"""
An example prototype CommissaireService base class.
Commissaire service class.
"""

def __init__(self, exchange_name, connection_url, qkwargs):
Expand All @@ -127,8 +127,9 @@ def __init__(self, exchange_name, connection_url, qkwargs):
:param qkwargs: One or more dicts keyword arguments for queue creation
:type qkwargs: list
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug('Initializing {0}'.format(self.__class__.__name__))
name = self.__class__.__name__
self.logger = logging.getLogger(name)
self.logger.debug('Initializing {0}'.format(name))
self.connection = Connection(connection_url)
self._channel = self.connection.channel()
self._exchange = Exchange(
Expand All @@ -146,7 +147,7 @@ def __init__(self, exchange_name, connection_url, qkwargs):

# Create producer for publishing on topics
self.producer = Producer(self._channel, self._exchange)
self.logger.debug('Initializing finished')
self.logger.debug('Initializing of {} finished'.format(name))

def get_consumers(self, Consumer, channel):
"""
Expand All @@ -164,26 +165,13 @@ def get_consumers(self, Consumer, channel):
for queue in self._queues:
self.logger.debug('Will consume on {0}'.format(queue.name))
consumers.append(
Consumer(queue, callbacks=[self._wrap_on_message]))
Consumer(queue, callbacks=[self.on_message]))
self.logger.debug('Consumers: {}'.format(consumers))
return consumers

def on_message(self, body, message):
"""
Called when a non-jsonrpc message arrives.
:param body: Body of the message.
:type body: dict
:param message: The message instance.
:type message: kombu.message.Message
"""
self.logger.error(
'Dropping unknown message: payload="{}", properties="{}"'.format(
body, message.properties))

def _wrap_on_message(self, body, message):
"""
Wraps on_message for jsonrpc routing and logging.
Called when a new message arrives.
:param body: Body of the message.
:type body: dict or json string
Expand Down Expand Up @@ -218,9 +206,11 @@ def _wrap_on_message(self, body, message):

self.logger.debug('Result for "{}": "{}"'.format(
response['id'], result))
# Otherwise send it to on_message
else:
self.on_message(body, message)
# Drop it
self.logger.error(
'Dropping unknown message: payload="{}", '
'properties="{}"'.format(body, message.properties))
except Exception as error:
jsonrpc_error_code = -32600
# If there is an attribute error then use the Method Not Found
Expand Down
6 changes: 3 additions & 3 deletions src/commissaire_service/transport/ansible_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
logger.addHandler(handler)


def get_inventory_file(hosts):
def get_inventory_file(hosts): # pragma: no cover
"""
Set up an --inventory-file option for the Ansible CLI.
Expand All @@ -52,7 +52,7 @@ def get_inventory_file(hosts):
return ['--inventory-file', hosts]


def gather_facts(host, args=[]):
def gather_facts(host, args=[]): # pragma: no cover
"""
Returns a dictionary of facts gathered by Ansible from a host.
Expand Down Expand Up @@ -86,7 +86,7 @@ def gather_facts(host, args=[]):
raise error


def execute_playbook(playbook, hosts, args=[]):
def execute_playbook(playbook, hosts, args=[]): # pragma: no cover
"""
Executes a playbook file for the given set of hosts, passing any
additional command-line arguments to the playbook command.
Expand Down
2 changes: 1 addition & 1 deletion src/commissaire_service/transport/ansibleapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .ansible_wrapper import gather_facts, execute_playbook


class Transport:
class Transport: # pragma: no cover
"""
Transport using Ansible.
"""
Expand Down
174 changes: 174 additions & 0 deletions src/commissaire_service/watcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# Copyright (C) 2016 Red Hat, Inc
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
The host node watcher.
"""

import json

from datetime import datetime, timedelta
from time import sleep

from commissaire import constants as C
from commissaire.models import Host, WatcherRecord
from commissaire.util.ssh import TemporarySSHKey

from commissaire_service.service import CommissaireService
from commissaire_service.transport import ansibleapi


class WatcherService(CommissaireService):
"""
Periodically connects to hosts to check their status.
"""

def __init__(self, exchange_name, connection_url):
"""
Creates a new WatcherService.
:param exchange_name: Name of the topic exchange
:type exchange_name: str
:param connection_url: Kombu connection URL
:type connection_url: str
"""
queue_kwargs = [{
'name': 'watcher',
'exclusive': False,
'routing_key': 'jobs.watcher',
}]
# Store the last address seen for backoff
self.last_address = None
super().__init__(exchange_name, connection_url, queue_kwargs)

def on_message(self, body, message):
"""
Called when a non-jsonrpc message arrives.
:param body: Body of the message.
:type body: dict
:param message: The message instance.
:type message: kombu.message.Message
"""
record = WatcherRecord(**json.loads(body))
# Ack the message so it does not requeue on it's own
message.ack()
self.logger.debug(
'Checking on WatcherQueue item: {}'.format(record.to_json()))
if datetime.strptime(record.last_check, C.DATE_FORMAT) < (
datetime.utcnow() - timedelta(minutes=1)):
try:
self._check(record.address)
except Exception as error:
self.logger.debug('Error: {}: {}'.format(type(error), error))
record.last_check = datetime.utcnow().isoformat()
else:
if self.last_address == record.address:
# Since we got the same address we could process twice
# back off a little extra
self.logger.debug(
'Got "{}" twice. Backing off...'.format(record.address))
sleep(10)
else:
# Since the top item wasn't ready for processing sleep a bit
sleep(2)
self.last_address = record.address
# Requeue the host
self.producer.publish(record.to_json(), 'jobs.watcher')

def _check(self, address):
"""
Initiates an check on the requested host.
:param address: Host address to investigate
:type address: str
:param cluster_data: Optional data for the associated cluster
:type cluster_data: dict
"""
# Statuses follow:
# http://commissaire.readthedocs.org/en/latest/enums.html#host-statuses

self.logger.info('Checking host "{}".'.format(address))
try:
response = self.request('storage.get', params={
'model_type_name': 'Host',
'model_json_data': Host.new(address=address).to_json(),
'secure': True,
})
host = Host.new(**response['result'])
except Exception as error:
self.logger.warn(
'Unable to continue for host "{}" due to '
'{}: {}. Returning...'.format(address, type(error), error))
raise error

transport = ansibleapi.Transport(host.remote_user)

with TemporarySSHKey(host, self.logger) as key:
try:
self.logger.debug(
'Starting watcher run for host "{}"'.format(address))
result = transport.check_host_availability(host, key.path)
host.last_check = datetime.utcnow().isoformat()
self.logger.debug(
'Watcher result for host {}: {}'.format(address, result))
except Exception as error:
self.logger.warn(
'Failed to connect to host node "{}"'.format(address))
self.logger.debug(
'Watcher failed for host node "{}" with {}: {}'.format(
address, str(error), error))
host.status = 'failed'
raise error
finally:
# Save the model
self.request('storage.save', params={
'model_type_name': host.__class__.__name__,
'model_json_data': host.to_json(),
})
self.logger.info(
'Finished watcher run for host "{}"'.format(address))


def main(): # pragma: no cover
"""
Main entry point.
"""
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
'--bus-exchange', type=str, default='commissaire',
help='Message bus exchange name.')
parser.add_argument(
'--bus-uri', type=str, metavar='BUS_URI',
default='redis://127.0.0.1:6379/', # FIXME Remove before release
help=(
'Message bus connection URI. See:'
'http://kombu.readthedocs.io/en/latest/userguide/connections.html')
)

args = parser.parse_args()

try:
service = WatcherService(
exchange_name=args.bus_exchange,
connection_url=args.bus_uri)
service.run()
except KeyboardInterrupt:
pass


if __name__ == '__main__':
main()
20 changes: 10 additions & 10 deletions test/test_service_commissaireservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_get_consumers(self):
self.assertEquals(1, len(consumers))
# With 1 callback pointing to the message wrapper
Consumer.assert_called_once_with(
mock.ANY, callbacks=[self.service_instance._wrap_on_message])
mock.ANY, callbacks=[self.service_instance.on_message])

def test_on_message(self):
"""
Expand Down Expand Up @@ -124,9 +124,9 @@ def test_responds(self):
self.service_instance.connection.SimpleQueue.__call__(
).close.assert_called_once_with()

def test__wrap_on_message_with_exposed_method(self):
def test_on_message_with_exposed_method(self):
"""
Verify ServiceManager._wrap_on_message routes requests properly.
Verify ServiceManager.on_message routes requests properly.
"""
body = {
'jsonrpc': '2.0',
Expand All @@ -139,14 +139,14 @@ def test__wrap_on_message_with_exposed_method(self):
properties={'reply_to': 'test_queue'},
delivery_info={'routing_key': 'test.method'})
self.service_instance.on_method = mock.MagicMock(return_value='{}')
self.service_instance._wrap_on_message(body, message)
self.service_instance.on_message(body, message)
# The on_method should have been called
self.service_instance.on_method.assert_called_once_with(
kwarg='value', message=message)

def test__wrap_on_message_without_exposed_method(self):
def test_on_message_without_exposed_method(self):
"""
Verify ServiceManager._wrap_on_message returns error if method doesn't exist.
Verify ServiceManager.on_message returns error if method doesn't exist.
"""
body = {
'jsonrpc': '2.0',
Expand All @@ -158,18 +158,18 @@ def test__wrap_on_message_without_exposed_method(self):
payload=body,
properties={'reply_to': 'test_queue'},
delivery_info={'routing_key': 'test.doesnotexist'})
self.service_instance._wrap_on_message(body, message)
self.service_instance.on_message(body, message)
self.service_instance.connection.SimpleQueue.assert_called_once_with(
'test_queue')

def test__wrap_on_message_with_bad_message(self):
def test_on_message_with_bad_message(self):
"""
Verify ServiceManager._wrap_on_message forwards to on_message on non jsonrpc messages.
Verify ServiceManager.on_message forwards to on_message on non jsonrpc messages.
"""
self.service_instance.on_message = mock.MagicMock()
body = '[]'
message = mock.MagicMock(
payload=body,
properties={'reply_to': 'test_queue'})
self.service_instance._wrap_on_message(body, message)
self.service_instance.on_message(body, message)
self.assertEquals(1, self.service_instance.on_message.call_count)
Loading

0 comments on commit f6852e2

Please sign in to comment.