Skip to content

Commit

Permalink
Minion-manager unit tests. (argoproj#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
shrinandj authored and wokeGit committed Nov 21, 2017
1 parent da0fbbe commit e4f50c8
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 9 deletions.
9 changes: 4 additions & 5 deletions .argo/argo-platform-unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ steps:
pytest -vv /src/platform/tests/kubeobject/ &&
pytest -vv /src/platform/tests/util/ &&
pytest -vv /src/platform/tests/lib/ax/platform/ax_asg_test.py &&
pytest -vv /src/platform/tests/axmon/operations_test.py

# TODO (#263): add those tests back later
# python2 -m pytest -vv /src/platform/tests/minion_manager/ax_minion_manager_test.py &&
# python2 -m pytest -vv /src/platform/tests/minion_manager/ax_bid_advisor_test.py
pytest -vv /src/platform/tests/axmon/operations_test.py &&
python2 -m pytest -s -vv /src/platform/tests/minion_manager/aws/aws_minion_manager_test.py &&
python2 -m pytest -s -vv /src/platform/tests/minion_manager/aws/aws_bid_advisor_test.py &&
python2 -m pytest -s -vv /src/platform/tests/minion_manager/broker_test.py
KUBE-MONITOR-TESTS:
template: argo-platform-unit-test-base
arguments:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ def __init__(self, scaling_groups, region, **kwargs):
name="MinionManagerRestAPI")
self.rest_thread.setDaemon(True)

self.k8s_client = KubernetesApiClient()

@staticmethod
@retry(wait_exponential_multiplier=1000, stop_max_attempt_number=3)
def describe_asg_with_retries(ac_client, asgs):
Expand Down Expand Up @@ -501,6 +499,7 @@ def get_asg_metas(self):

def rest_api(self):
""" Thread that responds to the Flask api endpoints. """
k8s_client = KubernetesApiClient()
app = Flask("MinionManagerRestAPI")

def _update_config_map(enabled_str, asgs):
Expand All @@ -510,7 +509,7 @@ def _update_config_map(enabled_str, asgs):
if asgs:
cmap.data["MM_SCALING_GROUPS"] = asgs

self.k8s_client.api.replace_namespaced_config_map(
k8s_client.api.replace_namespaced_config_map(
cmap, MM_CONFIG_MAP_NAMESPACE, MM_CONFIG_MAP_NAME)

@app.route('/spot_instance_config', methods=['PUT'])
Expand All @@ -537,7 +536,7 @@ def _update_spot_instances():
@app.route('/spot_instance_config', methods=['GET'])
def _get_spot_instances():
""" Get spot-instances config. """
cmap = self.k8s_client.api.read_namespaced_config_map(
cmap = k8s_client.api.read_namespaced_config_map(
namespace=MM_CONFIG_MAP_NAMESPACE, name=MM_CONFIG_MAP_NAME)
return jsonify({"status": cmap.data["MM_SPOT_INSTANCE_ENABLED"], "asgs": cmap.data["MM_SCALING_GROUPS"]})

Expand Down
128 changes: 128 additions & 0 deletions platform/tests/minion_manager/aws/aws_bid_advisor_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""The file has unit tests for the AWSBidAdvisor."""

import unittest

from ax.platform.minion_manager.cloud_provider.aws.aws_bid_advisor import AWSBidAdvisor

REFRESH_INTERVAL = 10
REGION = 'us-west-2'


class AWSBidAdvisorTest(unittest.TestCase):
"""
Tests for AWSBidAdvisor.
"""
def test_ba_lifecycle(self):
"""
Tests that the AWSBidVisor starts threads and stops them correctly.
"""
bidadv = AWSBidAdvisor(REFRESH_INTERVAL, REFRESH_INTERVAL, REGION)
assert len(bidadv.all_bid_advisor_threads) == 0
bidadv.run()
assert len(bidadv.all_bid_advisor_threads) == 2
bidadv.shutdown()
assert len(bidadv.all_bid_advisor_threads) == 0

def test_ba_on_demand_pricing(self):
"""
Tests that the AWSBidVisor correctly gets the on-demand pricing.
"""
bidadv = AWSBidAdvisor(REFRESH_INTERVAL, REFRESH_INTERVAL, REGION)
assert len(bidadv.on_demand_price_dict) == 0
updater = bidadv.OnDemandUpdater(bidadv)
updater.get_on_demand_pricing()
assert len(bidadv.on_demand_price_dict) > 0

def test_ba_spot_pricing(self):
"""
Tests that the AWSBidVisor correctly gets the spot instance pricing.
"""
bidadv = AWSBidAdvisor(REFRESH_INTERVAL, REFRESH_INTERVAL, REGION)
assert len(bidadv.spot_price_list) == 0
updater = bidadv.SpotInstancePriceUpdater(bidadv)
updater.get_spot_price_info()
assert len(bidadv.spot_price_list) > 0

def test_ba_price_update(self):
"""
Tests that the AXBidVisor actually updates the pricing info.
"""
bidadv = AWSBidAdvisor(REFRESH_INTERVAL, REFRESH_INTERVAL, REGION)
od_updater = bidadv.OnDemandUpdater(bidadv)
od_updater.get_on_demand_pricing()

sp_updater = bidadv.SpotInstancePriceUpdater(bidadv)
sp_updater.get_spot_price_info()

# Verify that the pricing info was populated.
assert len(bidadv.on_demand_price_dict) > 0
assert len(bidadv.spot_price_list) > 0

# Make the price dicts empty to check if they get updated.
bidadv.on_demand_price_dict = {}
bidadv.spot_price_list = {}

od_updater.get_on_demand_pricing()
sp_updater.get_spot_price_info()

# Verify that the pricing info is populated again.
assert len(bidadv.on_demand_price_dict) > 0
assert len(bidadv.spot_price_list) > 0

def test_ba_get_bid(self):
"""
Tests that the bid_advisor's get_new_bid() method returns correct
bid information.
"""
bidadv = AWSBidAdvisor(REFRESH_INTERVAL, REFRESH_INTERVAL, REGION)

instance_type = "m3.large"
zone = "us-west-2b"
# Manually populate the prices so that spot-instance prices are chosen.
bidadv.on_demand_price_dict["m3.large"] = "100"
bidadv.spot_price_list = [{'InstanceType': instance_type,
'SpotPrice': '80',
'AvailabilityZone': zone}]
bid_info = bidadv.get_new_bid(zone, instance_type)
assert bid_info is not None, "BidAdvisor didn't return any " + \
"new bid information."
assert bid_info["type"] == "spot"
assert isinstance(bid_info["price"], str)

# Manually populate the prices so that on-demand instances are chosen.
bidadv.spot_price_list = [{'InstanceType': instance_type,
'SpotPrice': '85',
'AvailabilityZone': zone}]
bid_info = bidadv.get_new_bid(zone, instance_type)
assert bid_info is not None, "BidAdvisor didn't return any now " + \
"bid information."
assert bid_info["type"] == "on-demand"

def test_ba_get_bid_no_data(self):
"""
Tests that the BidAdvisor returns the default if the pricing
information hasn't be obtained yet.
"""
bidadv = AWSBidAdvisor(REFRESH_INTERVAL, REFRESH_INTERVAL, REGION)
bid_info = bidadv.get_new_bid('us-west-2a', 'm3.large')
assert bid_info["type"] == "on-demand"

def test_ba_get_current_price(self):
"""
Tests that the BidAdvisor returns the most recent price information.
"""
bidadv = AWSBidAdvisor(REFRESH_INTERVAL, REFRESH_INTERVAL, REGION)

od_updater = bidadv.OnDemandUpdater(bidadv)
od_updater.get_on_demand_pricing()

sp_updater = bidadv.SpotInstancePriceUpdater(bidadv)
sp_updater.get_spot_price_info()

# Verify that the pricing info was populated.
assert len(bidadv.on_demand_price_dict) > 0
assert len(bidadv.spot_price_list) > 0

price_info_map = bidadv.get_current_price()
assert price_info_map["spot"] is not None
assert price_info_map["on-demand"] is not None
Loading

0 comments on commit e4f50c8

Please sign in to comment.