Skip to content
This repository has been archived by the owner on Mar 22, 2018. It is now read-only.

Commit

Permalink
Use StorageClient to talk to storage service.
Browse files Browse the repository at this point in the history
This brings max_complexity back down to 15.
  • Loading branch information
mbarnes authored and ashcrow committed Dec 5, 2016
1 parent 339b4f6 commit dd81a36
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 123 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[flake8]
ignore=E402
exclude=test/*
max-complexity=22
max-complexity=15

[nosetests]
verbosity=2
Expand Down
73 changes: 9 additions & 64 deletions src/commissaire_service/clusterexec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import json

from commissaire.models import (
ClusterDeploy, ClusterUpgrade, ClusterRestart, Cluster, Host)
ClusterDeploy, ClusterUpgrade, ClusterRestart)
from commissaire.storage.client import StorageClient
from commissaire.util.date import formatted_dt
from commissaire.util.ssh import TemporarySSHKey

Expand All @@ -43,6 +44,7 @@ def __init__(self, exchange_name, connection_url):
{'routing_key': 'jobs.clusterexec.*'}
]
super().__init__(exchange_name, connection_url, queue_kwargs)
self.storage = StorageClient(self)

def _execute(self, message, model_instance, command_args,
finished_hosts_key):
Expand Down Expand Up @@ -76,11 +78,7 @@ def _execute(self, message, model_instance, command_args,
# Set the initial status in the store.
self.logger.info('Setting initial status.')
self.logger.debug('Status={}'.format(model_json_data))
params = {
'model_type_name': model_instance.__class__.__name__,
'model_json_data': model_json_data
}
self.request('storage.save', params=params)
self.storage.save(model_instance)

# Respond to the caller with the initial status.
if message.properties.get('reply_to'):
Expand All @@ -102,19 +100,7 @@ def _execute(self, message, model_instance, command_args,

# Collect all host addresses in the cluster.

try:
cluster = Cluster.new(name=cluster_name)
params = {
'model_type_name': cluster.__class__.__name__,
'model_json_data': cluster.to_json()
}
response = self.request('storage.get', params=params)
cluster = Cluster.new(**response['result'])
except Exception as error:
self.logger.warn(
'Unable to continue for cluster "{}" due to '
'{}: {}'.format(cluster_name, type(error), error))
raise error
cluster = self.storage.get_cluster(cluster_name)

n_hosts = len(cluster.hostset)
if n_hosts:
Expand All @@ -124,19 +110,7 @@ def _execute(self, message, model_instance, command_args,
self.logger.warn('No hosts in cluster "{}"'.format(cluster_name))

for address in cluster.hostset:
try:
host = Host.new(address=address)
params = {
'model_type_name': host.__class__.__name__,
'model_json_data': host.to_json()
}
response = self.request('storage.get', params=params)
host = Host.new(**response['result'])
except Exception as error:
self.logger.warn(
'Unable to get host info for "{}" due to '
'{}: {}'.format(address, type(error), error))
raise error
host = self.storage.get_host(address)

oscmd = get_oscmd(host.os)

Expand All @@ -146,17 +120,7 @@ def _execute(self, message, model_instance, command_args,
os_command, host.address))

model_instance.in_process.append(host.address)
try:
params = {
'model_type_name': model_instance.__class__.__name__,
'model_json_data': model_instance.to_json()
}
self.request('storage.save', params=params)
except Exception as error:
self.logger.error(
'Unable to save in_process state for "{}" clusterexec '
'due to {}: {}'.format(cluster_name, type(error), error))
raise error
self.storage.save(model_instance)

with TemporarySSHKey(host, self.logger) as key:
try:
Expand All @@ -181,17 +145,7 @@ def _execute(self, message, model_instance, command_args,
self.logger.warn(
'Host {} was not in_process for {} {}'.format(
host.address, command_name, cluster_name))
try:
params = {
'model_type_name': model_instance.__class__.__name__,
'model_json_data': model_instance.to_json()
}
self.request('storage.save', params=params)
except Exception as error:
self.logger.error(
'Unable to save cluster state for "{}" clusterexec '
'due to {}: {}'.format(cluster_name, type(error), error))
raise error
self.storage.save(model_instance)

self.logger.info(
'Finished executing {} for {} in {}'.format(
Expand All @@ -206,16 +160,7 @@ def _execute(self, message, model_instance, command_args,
'Cluster {} final {} status: {}'.format(
cluster_name, command_name, model_instance.to_json()))

try:
params = {
'model_type_name': model_instance.__class__.__name__,
'model_json_data': model_instance.to_json()
}
self.request('storage.save', params=params)
except Exception as error:
self.logger.error(
'Unable to save final state for "{}" clusterexec '
'due to {}: {}'.format(cluster_name, type(error), error))
self.storage.save(model_instance)

def on_upgrade(self, message, cluster_name):
"""
Expand Down
36 changes: 6 additions & 30 deletions src/commissaire_service/investigator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import commissaire.constants as C

from commissaire.models import Cluster, Host, Network
from commissaire.storage.client import StorageClient
from commissaire.util.config import ConfigurationError
from commissaire.util.date import formatted_dt
from commissaire.util.ssh import TemporarySSHKey
Expand Down Expand Up @@ -45,6 +46,7 @@ def __init__(self, exchange_name, connection_url):
{'routing_key': 'jobs.investigate'}
]
super().__init__(exchange_name, connection_url, queue_kwargs)
self.storage = StorageClient(self)

def _get_etcd_config(self):
"""
Expand Down Expand Up @@ -74,13 +76,7 @@ def _get_cluster_and_network_models(self, cluster_data):
"""
try:
cluster = Cluster.new(**cluster_data)
network = Network.new(name=cluster.network)
params = {
'model_type_name': network.__class__.__name__,
'model_json_data': network.to_json()
}
response = self.request('storage.get', params=params)
network = Network.new(**response['result'])
network = self.storage.get_network(cluster.network)
except TypeError:
cluster = Cluster.new(type=C.CLUSTER_TYPE_HOST)
network = Network.new(**C.DEFAULT_CLUSTER_NETWORK_JSON)
Expand All @@ -106,19 +102,7 @@ def on_investigate(self, message, address, cluster_data={}):
if cluster_data:
self.logger.debug('Related cluster: {}'.format(cluster_data))

try:
params = {
'model_type_name': 'Host',
'model_json_data': Host.new(address=address).to_json()
}
response = self.request('storage.get', params=params)
host = Host.new(**response['result'])
except Exception as error:
self.logger.warn(
'Unable to continue for {} due to '
'{}: {}. Returning...'.format(address, type(error), error))
raise error

host = self.storage.get_host(address)
transport = ansibleapi.Transport(host.remote_user)

key = TemporarySSHKey(host, self.logger)
Expand Down Expand Up @@ -148,11 +132,7 @@ def on_investigate(self, message, address, cluster_data={}):
raise error
finally:
# Save the updated host model.
params = {
'model_type_name': host.__class__.__name__,
'model_json_data': host.to_json()
}
self.request('storage.save', params=params)
self.storage.save(host)

self.logger.info(
'Finished and stored investigation data for {}'.format(address))
Expand Down Expand Up @@ -188,11 +168,7 @@ def on_investigate(self, message, address, cluster_data={}):
raise error
finally:
# Save the updated host model.
params = {
'model_type_name': host.__class__.__name__,
'model_json_data': host.to_json()
}
self.request('storage.save', params=params)
self.storage.save(host)

# Verify association with relevant container managers
# FIXME Adapt this for ContainerManagerConfig.
Expand Down
21 changes: 5 additions & 16 deletions src/commissaire_service/watcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from time import sleep

from commissaire import constants as C
from commissaire.models import Host, WatcherRecord
from commissaire.models import WatcherRecord
from commissaire.storage.client import StorageClient
from commissaire.util.date import formatted_dt
from commissaire.util.ssh import TemporarySSHKey

Expand Down Expand Up @@ -52,6 +53,7 @@ def __init__(self, exchange_name, connection_url):
# Store the last address seen for backoff
self.last_address = None
super().__init__(exchange_name, connection_url, queue_kwargs)
self.storage = StorageClient(self)

def on_message(self, body, message):
"""
Expand Down Expand Up @@ -101,18 +103,8 @@ def _check(self, address):
# http://commissaire.readthedocs.org/en/latest/enums.html#host-statuses

self.logger.info('Checking host "{}".'.format(address))
try:
response = self.request('storage.get', params={
'model_type_name': 'Host',
'model_json_data': Host.new(address=address).to_json()
})
host = Host.new(**response['result'])
except Exception as error:
self.logger.warn(
'Unable to continue for host "{}" due to '
'{}: {}. Returning...'.format(address, type(error), error))
raise error

host = self.storage.get_host(address)
transport = ansibleapi.Transport(host.remote_user)

with TemporarySSHKey(host, self.logger) as key:
Expand All @@ -133,10 +125,7 @@ def _check(self, address):
raise error
finally:
# Save the model
self.request('storage.save', params={
'model_type_name': host.__class__.__name__,
'model_json_data': host.to_json(),
})
self.storage.save(host)
self.logger.info(
'Finished watcher run for host "{}"'.format(address))

Expand Down
19 changes: 7 additions & 12 deletions test/test_service_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,13 @@ def test__check_with_no_errors(self):
'commissaire_service.transport.ansibleapi.Transport') as _transport:
transport = _transport()

self.service_instance.request = mock.MagicMock(
side_effect=(
# Getting the host
{'result': {
'address': '127.0.0.1',
'last_check': datetime.datetime.min.isoformat()
}},
# The save
None))
self.service_instance.storage = mock.MagicMock()
self.service_instance.storage.get_host.return_value = models.Host.new(
address='127.0.0.1',
last_check=datetime.datetime.min.isoformat())
self.service_instance.storage.save.return_value = None
self.service_instance._check('127.0.0.1')
# The transport method should have been called once
self.assertEquals(1, transport.check_host_availability.call_count)
# Verify the last request was a save
self.service_instance.request.assert_called_with(
'storage.save', params=mock.ANY)
# Verify 'storage.save' got called
self.service_instance.storage.save.assert_called_with(mock.ANY)

0 comments on commit dd81a36

Please sign in to comment.