From dd81a367f14bc1b705d193e0844c46569a7e4739 Mon Sep 17 00:00:00 2001 From: Matthew Barnes Date: Thu, 1 Dec 2016 17:11:28 -0500 Subject: [PATCH] Use StorageClient to talk to storage service. This brings max_complexity back down to 15. --- setup.cfg | 2 +- .../clusterexec/__init__.py | 73 +++---------------- .../investigator/__init__.py | 36 ++------- src/commissaire_service/watcher/__init__.py | 21 ++---- test/test_service_watcher.py | 19 ++--- 5 files changed, 28 insertions(+), 123 deletions(-) diff --git a/setup.cfg b/setup.cfg index 7e10592..6744a1b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [flake8] ignore=E402 exclude=test/* -max-complexity=22 +max-complexity=15 [nosetests] verbosity=2 diff --git a/src/commissaire_service/clusterexec/__init__.py b/src/commissaire_service/clusterexec/__init__.py index 8786974..8382771 100644 --- a/src/commissaire_service/clusterexec/__init__.py +++ b/src/commissaire_service/clusterexec/__init__.py @@ -16,7 +16,8 @@ import json from commissaire.models import ( - ClusterDeploy, ClusterUpgrade, ClusterRestart, Cluster, Host) + ClusterDeploy, ClusterUpgrade, ClusterRestart) +from commissaire.storage.client import StorageClient from commissaire.util.date import formatted_dt from commissaire.util.ssh import TemporarySSHKey @@ -43,6 +44,7 @@ def __init__(self, exchange_name, connection_url): {'routing_key': 'jobs.clusterexec.*'} ] super().__init__(exchange_name, connection_url, queue_kwargs) + self.storage = StorageClient(self) def _execute(self, message, model_instance, command_args, finished_hosts_key): @@ -76,11 +78,7 @@ def _execute(self, message, model_instance, command_args, # Set the initial status in the store. self.logger.info('Setting initial status.') self.logger.debug('Status={}'.format(model_json_data)) - params = { - 'model_type_name': model_instance.__class__.__name__, - 'model_json_data': model_json_data - } - self.request('storage.save', params=params) + self.storage.save(model_instance) # Respond to the caller with the initial status. if message.properties.get('reply_to'): @@ -102,19 +100,7 @@ def _execute(self, message, model_instance, command_args, # Collect all host addresses in the cluster. - try: - cluster = Cluster.new(name=cluster_name) - params = { - 'model_type_name': cluster.__class__.__name__, - 'model_json_data': cluster.to_json() - } - response = self.request('storage.get', params=params) - cluster = Cluster.new(**response['result']) - except Exception as error: - self.logger.warn( - 'Unable to continue for cluster "{}" due to ' - '{}: {}'.format(cluster_name, type(error), error)) - raise error + cluster = self.storage.get_cluster(cluster_name) n_hosts = len(cluster.hostset) if n_hosts: @@ -124,19 +110,7 @@ def _execute(self, message, model_instance, command_args, self.logger.warn('No hosts in cluster "{}"'.format(cluster_name)) for address in cluster.hostset: - try: - host = Host.new(address=address) - params = { - 'model_type_name': host.__class__.__name__, - 'model_json_data': host.to_json() - } - response = self.request('storage.get', params=params) - host = Host.new(**response['result']) - except Exception as error: - self.logger.warn( - 'Unable to get host info for "{}" due to ' - '{}: {}'.format(address, type(error), error)) - raise error + host = self.storage.get_host(address) oscmd = get_oscmd(host.os) @@ -146,17 +120,7 @@ def _execute(self, message, model_instance, command_args, os_command, host.address)) model_instance.in_process.append(host.address) - try: - params = { - 'model_type_name': model_instance.__class__.__name__, - 'model_json_data': model_instance.to_json() - } - self.request('storage.save', params=params) - except Exception as error: - self.logger.error( - 'Unable to save in_process state for "{}" clusterexec ' - 'due to {}: {}'.format(cluster_name, type(error), error)) - raise error + self.storage.save(model_instance) with TemporarySSHKey(host, self.logger) as key: try: @@ -181,17 +145,7 @@ def _execute(self, message, model_instance, command_args, self.logger.warn( 'Host {} was not in_process for {} {}'.format( host.address, command_name, cluster_name)) - try: - params = { - 'model_type_name': model_instance.__class__.__name__, - 'model_json_data': model_instance.to_json() - } - self.request('storage.save', params=params) - except Exception as error: - self.logger.error( - 'Unable to save cluster state for "{}" clusterexec ' - 'due to {}: {}'.format(cluster_name, type(error), error)) - raise error + self.storage.save(model_instance) self.logger.info( 'Finished executing {} for {} in {}'.format( @@ -206,16 +160,7 @@ def _execute(self, message, model_instance, command_args, 'Cluster {} final {} status: {}'.format( cluster_name, command_name, model_instance.to_json())) - try: - params = { - 'model_type_name': model_instance.__class__.__name__, - 'model_json_data': model_instance.to_json() - } - self.request('storage.save', params=params) - except Exception as error: - self.logger.error( - 'Unable to save final state for "{}" clusterexec ' - 'due to {}: {}'.format(cluster_name, type(error), error)) + self.storage.save(model_instance) def on_upgrade(self, message, cluster_name): """ diff --git a/src/commissaire_service/investigator/__init__.py b/src/commissaire_service/investigator/__init__.py index 9b315fa..09a6d6d 100644 --- a/src/commissaire_service/investigator/__init__.py +++ b/src/commissaire_service/investigator/__init__.py @@ -18,6 +18,7 @@ import commissaire.constants as C from commissaire.models import Cluster, Host, Network +from commissaire.storage.client import StorageClient from commissaire.util.config import ConfigurationError from commissaire.util.date import formatted_dt from commissaire.util.ssh import TemporarySSHKey @@ -45,6 +46,7 @@ def __init__(self, exchange_name, connection_url): {'routing_key': 'jobs.investigate'} ] super().__init__(exchange_name, connection_url, queue_kwargs) + self.storage = StorageClient(self) def _get_etcd_config(self): """ @@ -74,13 +76,7 @@ def _get_cluster_and_network_models(self, cluster_data): """ try: cluster = Cluster.new(**cluster_data) - network = Network.new(name=cluster.network) - params = { - 'model_type_name': network.__class__.__name__, - 'model_json_data': network.to_json() - } - response = self.request('storage.get', params=params) - network = Network.new(**response['result']) + network = self.storage.get_network(cluster.network) except TypeError: cluster = Cluster.new(type=C.CLUSTER_TYPE_HOST) network = Network.new(**C.DEFAULT_CLUSTER_NETWORK_JSON) @@ -106,19 +102,7 @@ def on_investigate(self, message, address, cluster_data={}): if cluster_data: self.logger.debug('Related cluster: {}'.format(cluster_data)) - try: - params = { - 'model_type_name': 'Host', - 'model_json_data': Host.new(address=address).to_json() - } - response = self.request('storage.get', params=params) - host = Host.new(**response['result']) - except Exception as error: - self.logger.warn( - 'Unable to continue for {} due to ' - '{}: {}. Returning...'.format(address, type(error), error)) - raise error - + host = self.storage.get_host(address) transport = ansibleapi.Transport(host.remote_user) key = TemporarySSHKey(host, self.logger) @@ -148,11 +132,7 @@ def on_investigate(self, message, address, cluster_data={}): raise error finally: # Save the updated host model. - params = { - 'model_type_name': host.__class__.__name__, - 'model_json_data': host.to_json() - } - self.request('storage.save', params=params) + self.storage.save(host) self.logger.info( 'Finished and stored investigation data for {}'.format(address)) @@ -188,11 +168,7 @@ def on_investigate(self, message, address, cluster_data={}): raise error finally: # Save the updated host model. - params = { - 'model_type_name': host.__class__.__name__, - 'model_json_data': host.to_json() - } - self.request('storage.save', params=params) + self.storage.save(host) # Verify association with relevant container managers # FIXME Adapt this for ContainerManagerConfig. diff --git a/src/commissaire_service/watcher/__init__.py b/src/commissaire_service/watcher/__init__.py index 53fddb1..49d77ef 100644 --- a/src/commissaire_service/watcher/__init__.py +++ b/src/commissaire_service/watcher/__init__.py @@ -22,7 +22,8 @@ from time import sleep from commissaire import constants as C -from commissaire.models import Host, WatcherRecord +from commissaire.models import WatcherRecord +from commissaire.storage.client import StorageClient from commissaire.util.date import formatted_dt from commissaire.util.ssh import TemporarySSHKey @@ -52,6 +53,7 @@ def __init__(self, exchange_name, connection_url): # Store the last address seen for backoff self.last_address = None super().__init__(exchange_name, connection_url, queue_kwargs) + self.storage = StorageClient(self) def on_message(self, body, message): """ @@ -101,18 +103,8 @@ def _check(self, address): # http://commissaire.readthedocs.org/en/latest/enums.html#host-statuses self.logger.info('Checking host "{}".'.format(address)) - try: - response = self.request('storage.get', params={ - 'model_type_name': 'Host', - 'model_json_data': Host.new(address=address).to_json() - }) - host = Host.new(**response['result']) - except Exception as error: - self.logger.warn( - 'Unable to continue for host "{}" due to ' - '{}: {}. Returning...'.format(address, type(error), error)) - raise error + host = self.storage.get_host(address) transport = ansibleapi.Transport(host.remote_user) with TemporarySSHKey(host, self.logger) as key: @@ -133,10 +125,7 @@ def _check(self, address): raise error finally: # Save the model - self.request('storage.save', params={ - 'model_type_name': host.__class__.__name__, - 'model_json_data': host.to_json(), - }) + self.storage.save(host) self.logger.info( 'Finished watcher run for host "{}"'.format(address)) diff --git a/test/test_service_watcher.py b/test/test_service_watcher.py index 3f656c9..4caf707 100644 --- a/test/test_service_watcher.py +++ b/test/test_service_watcher.py @@ -122,18 +122,13 @@ def test__check_with_no_errors(self): 'commissaire_service.transport.ansibleapi.Transport') as _transport: transport = _transport() - self.service_instance.request = mock.MagicMock( - side_effect=( - # Getting the host - {'result': { - 'address': '127.0.0.1', - 'last_check': datetime.datetime.min.isoformat() - }}, - # The save - None)) + self.service_instance.storage = mock.MagicMock() + self.service_instance.storage.get_host.return_value = models.Host.new( + address='127.0.0.1', + last_check=datetime.datetime.min.isoformat()) + self.service_instance.storage.save.return_value = None self.service_instance._check('127.0.0.1') # The transport method should have been called once self.assertEquals(1, transport.check_host_availability.call_count) - # Verify the last request was a save - self.service_instance.request.assert_called_with( - 'storage.save', params=mock.ANY) + # Verify 'storage.save' got called + self.service_instance.storage.save.assert_called_with(mock.ANY)