Skip to content

Commit

Permalink
Thread Improvements (mesosphere#235)
Browse files Browse the repository at this point in the history
* Add thread timeout

* Add documentation and break scale creation into more methods

Add documentation throughout the scale tool, both at the function
and line level.

The initial function to create scale workloads is too big and
this begins to break some of the functionality out into smaller
functions.

* Capture result from thread runs

Add custom thread type that stores result of run() so that
failures can be detected and reported. This is used to
determine what Jenkins instances had issues and can be skipped
in future steps.

Move timeout and error detection into its own function so both
setup and teardown can use it. Any unsuccessful scenarios will be
reported through the logger at the warning level.

* Creating jobs is now threaded
  • Loading branch information
colin-msphere authored May 22, 2018
1 parent 89a7973 commit 310f419
Showing 1 changed file with 168 additions and 39 deletions.
207 changes: 168 additions & 39 deletions tests/scale/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
"""

import logging
import threading
import time
from threading import Thread
from typing import List, Set
from xml.etree import ElementTree

import config
Expand All @@ -44,6 +46,33 @@
log = logging.getLogger(__name__)

SHARED_ROLE = "jenkins-role"
# initial timeout waiting on deployments
DEPLOY_TIMEOUT = 15 * 60 # 15 mins
JOB_RUN_TIMEOUT = 10 * 60 # 10 mins


class ResultThread(Thread):
"""A thread that stores the result of the run command."""

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._result = None

@property
def result(self) -> bool:
"""Run result
Returns: True if completed successfully.
"""
return bool(self._result)

def run(self) -> None:
try:
super().run()
self._result = True
except Exception as e:
self._result = False


@pytest.mark.scale
Expand All @@ -55,7 +84,7 @@ def test_scaling_load(master_count,
work_duration,
mom,
external_volume: bool,
scenario):
scenario) -> None:
"""Launch a load test scenario. This does not verify the results
of the test, but does ensure the instances and jobs were created.
Expand All @@ -80,29 +109,29 @@ def test_scaling_load(master_count,
masters = ["jenkins{}".format(sdk_utils.random_string()) for _ in
range(0, int(master_count))]
# launch Jenkins services
install_threads = list()
for service_name in masters:
t = threading.Thread(target=_install_jenkins,
args=(service_name, mom, external_volume))
install_threads.append(t)
t.start()
# wait on installation threads
for thread in install_threads:
thread.join()
# now try to launch jobs
for service_name in masters:
m_label = _create_executor_configuration(service_name)
_launch_jobs(service_name,
jobs=job_count,
single=single_use,
delay=run_delay,
duration=work_duration,
label=m_label,
scenario=scenario)
install_threads = _spawn_threads(masters,
_install_jenkins,
external_volume=external_volume,
mom=mom,
daemon=True)
thread_failures = _wait_and_get_failures(install_threads,
timeout=DEPLOY_TIMEOUT)
thread_names = [x.name for x in thread_failures]

# the rest of the commands require a running Jenkins instance
deployed_masters = [x for x in masters if x not in thread_names]
job_threads = _spawn_threads(deployed_masters,
_create_jobs,
jobs=job_count,
single=single_use,
delay=run_delay,
duration=work_duration,
scenario=scenario)
_wait_on_threads(job_threads, JOB_RUN_TIMEOUT)


@pytest.mark.scalecleanup
def test_cleanup_scale(mom):
def test_cleanup_scale(mom) -> None:
"""Blanket clean-up of jenkins instances on a DC/OS cluster.
1. Queries Marathon for all apps matching "jenkins" prefix
Expand All @@ -113,20 +142,20 @@ def test_cleanup_scale(mom):
jenkins_apps = r.json()['apps']
jenkins_ids = [x['id'] for x in jenkins_apps]

cleanup_threads = list()
service_ids = list()
for service_id in jenkins_ids:
if service_id.startswith('/'):
service_id = service_id[1:]
# skip over '/jenkins' instance - not setup by tests
if service_id == 'jenkins':
continue
t = threading.Thread(target=_cleanup_jenkins_install,
args=(service_id, mom))
cleanup_threads.append(t)
t.start()
# wait for cleanup to complete
for thread in cleanup_threads:
thread.join()
service_ids.append(service_id)

cleanup_threads = _spawn_threads(service_ids,
_cleanup_jenkins_install,
mom=mom,
daemon=False)
_wait_and_get_failures(cleanup_threads, timeout=JOB_RUN_TIMEOUT)


def _setup_quota(role, cpus):
Expand All @@ -150,33 +179,80 @@ def _set_quota(role, cpus):
sdk_quota.create_quota(role, cpus=cpus)


def _spawn_threads(names, target, daemon=False, **kwargs) -> List[ResultThread]:
"""Create and start threads running target. This will pass
the thread name to the target as the first argument.
Args:
names: Thread names
target: Function to run in thread
**kwargs: Keyword args for target
Returns:
List of threads handling target.
"""
thread_list = list()
for service_name in names:
# setDaemon allows the main thread to exit even if
# these threads are still running.
t = ResultThread(target=target,
daemon=daemon,
name=service_name,
args=(service_name,),
kwargs=kwargs)
thread_list.append(t)
t.start()
return thread_list


def _install_jenkins(service_name, mom=None, external_volume=None):
"""Install Jenkins service.
Args:
service_name: Service Name or Marathon ID (same thing)
mom: Marathon on Marathon instance name
external_volume: Enable external volumes
"""
log.info("Installing jenkins '{}'".format(service_name))
jenkins.install(service_name, role=SHARED_ROLE, mom=mom, external_volume=None)
try:
jenkins.install(service_name, role=SHARED_ROLE, mom=mom,
external_volume=external_volume)
except Exception as e:
log.warning("Error encountered while installing Jenkins: {}".format(e))
raise e


def _cleanup_jenkins_install(service_name, mom=None):
def _cleanup_jenkins_install(service_name, **kwargs):
"""Delete all jobs and uninstall Jenkins instance.
Args:
service_name: Service name or Marathon ID
"""
if service_name.startswith('/'):
service_name = service_name[1:]
log.info("Removing all jobs on {}.".format(service_name))
jenkins.delete_all_jobs(service_name, retry=False)
log.info("Uninstalling {}.".format(service_name))
jenkins.uninstall(service_name,
package_name=config.PACKAGE_NAME,
mom=mom)
try:
log.info("Removing all jobs on {}.".format(service_name))
jenkins.delete_all_jobs(service_name, retry=False)
finally:
log.info("Uninstalling {}.".format(service_name))
jenkins.uninstall(service_name,
package_name=config.PACKAGE_NAME,
**kwargs)


def _create_jobs(service_name, **kwargs):
"""Create jobs on deployed Jenkins instances.
All functionality around creating jobs should go here.
def _create_executor_configuration(service_name):
Args:
service_name: Jenkins instance name
"""
m_label = _create_executor_configuration(service_name)
_launch_jobs(service_name, label=m_label, **kwargs)


def _create_executor_configuration(service_name: str) -> str:
"""Create a new Mesos Slave Info configuration with a random name.
Args:
Expand Down Expand Up @@ -233,3 +309,56 @@ def _launch_jobs(service_name: str,
'EVERY_XMIN': str(delay),
'SLEEP_DURATION': str(duration),
'SCENARIO': scenario})


def _wait_on_threads(thread_list: List[Thread],
timeout=DEPLOY_TIMEOUT) -> List[Thread]:
"""Wait on the threads in `install_threads` until a specified time
has elapsed.
Args:
thread_list: List of threads
timeout: Timeout is seconds
Returns:
List of threads that are still running.
"""
start_time = current_time = time.time()
for thread in thread_list:
remaining = timeout - (current_time - start_time)
if remaining < 1:
break
thread.join(timeout=remaining)
current_time = time.time()
active_threads = [x for x in thread_list if x.isAlive()]
return active_threads


def _wait_and_get_failures(thread_list: List[ResultThread],
**kwargs) -> Set[Thread]:
"""Wait on threads to complete or timeout and log errors.
Args:
thread_list: List of threads to wait on
Returns: A list of service names that failed or timed out.
"""
timeout_failures = _wait_on_threads(thread_list, **kwargs)
timeout_names = [x.name for x in timeout_failures]
if timeout_names:
log.warning("The following {:d} Jenkins instance(s) failed to "
"complete in {:d} minutes: {}"
.format(len(timeout_names),
DEPLOY_TIMEOUT // 60,
', '.join(timeout_names)))
# the following did not timeout, but failed
run_failures = [x for x in thread_list if not x.result]
run_fail_names = [x.name for x in run_failures]
if run_fail_names:
log.warning("The following {:d} Jenkins instance(s) "
"encountered an error: {}"
.format(len(run_fail_names),
', '.join(run_fail_names)))
return set(timeout_failures + run_failures)

0 comments on commit 310f419

Please sign in to comment.