Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended metadata testing #78

Open
wants to merge 26 commits into
base: release_20.01
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b5f5a3f
Reduce duplication in test_kubernetes_staging
jmchilton Jan 16, 2020
310fe87
Eliminate need for fixed port in Kubernetes staging tests.
jmchilton Apr 1, 2020
9c81fd9
Refactor galaxy instance ID parsing for k8s for reuse.
jmchilton Mar 30, 2020
10e0847
Refactor local job cancel test case for reuse.
jmchilton Mar 30, 2020
7972d8b
Set random instance ID in K8S staging tests.
jmchilton Mar 30, 2020
a0a8e08
Test case for stopping Pulsar Kubernetes job.
jmchilton Apr 1, 2020
5d67041
A couple more docs for Kuberentes Pulsar execution.
jmchilton Apr 1, 2020
8c01d57
Run kubernetes staging test only if AMQP_URL is set.
jmchilton Apr 2, 2020
9dc7d7c
Rev pulsar for fixes.
jmchilton Apr 2, 2020
1a01de6
Log node state if history doesn't finish
mvdbeek Apr 2, 2020
776221e
Delete 15G dotnet folder, use minikube 1.9.0
mvdbeek Apr 2, 2020
405f142
Lint fix?
jmchilton Apr 13, 2020
af7affa
Outline a test of kubernetes staging + interactive tool.
jmchilton Mar 26, 2020
1b9943b
Fix for interactivetool_proxy_host added in 9504.
jmchilton Apr 12, 2020
82554d5
Have Pulsar in Kubernetes mode poll for ITs if needed.
jmchilton Apr 14, 2020
afa07fe
Use namespaces in Kubernetes tests.
jmchilton Apr 13, 2020
85c0d11
Add test case for extended metadata with two container pod testing.
jmchilton Jan 16, 2020
acf1d9e
Python 3 compat. exception handling in driver_util?
jmchilton Jan 29, 2020
21dac3e
Better options GALAXY_TEST_KUBERNETES_INFRASTRUCTURE_HOST.
jmchilton Feb 26, 2020
930737e
WIP: Option to bind test driver default_web_host to 0.0.0.0.
jmchilton Feb 12, 2020
72ca24e
Rebase out - extra debug we don't need long term.
jmchilton Feb 26, 2020
3c39f79
REBASE OUT - only run one staging test
jmchilton Feb 19, 2020
e84531d
REBASE OUT: Restrict tests to speed up github action testing.
jmchilton Feb 27, 2020
aeae44d
Try binding uwsgi to 0.0.0.0 for Kubernetes staging tests.
jmchilton Mar 12, 2020
adb0e86
Fix interactive tools on Kubernetes.
jmchilton Mar 24, 2020
57e718a
Testing junk - some might be good though.
jmchilton Apr 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
strategy:
matrix:
python-version: [3.7]
subset: ['upload_datatype', 'extended_metadata', 'kubernetes', 'not (upload_datatype or extended_metadata or kubernetes)']
subset: ['KubernetesDependencyResolutionIntegrationTestCase']
services:
postgres:
image: postgres:11
Expand All @@ -27,15 +27,20 @@ jobs:
steps:
- name: Prune unused docker image, volumes and containers
run: docker system prune -a -f
- name: Setup Minikube
- name: Clean dotnet folder for space
if: matrix.subset == 'kubernetes'
run: rm -Rf /usr/share/dotnet
- name: Setup Minikube
if: matrix.subset == 'KubernetesDependencyResolutionIntegrationTestCase'
id: minikube
uses: CodingNagger/minikube-setup-action@v1.0.2
uses: CodingNagger/minikube-setup-action@v1.0.3
with:
minikube-version: "1.9.0-0_amd64"
- name: Launch Minikube
if: matrix.subset == 'kubernetes'
if: matrix.subset == 'KubernetesDependencyResolutionIntegrationTestCase'
run: eval ${{ steps.minikube.outputs.launcher }}
- name: Check pods
if: matrix.subset == 'kubernetes'
if: matrix.subset == 'KubernetesDependencyResolutionIntegrationTestCase'
run: |
kubectl get pods
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ def _process_config(self, kwargs):
# InteractiveTools propagator mapping file
self.interactivetools_map = self.resolve_path(kwargs.get("interactivetools_map", os.path.join(self.data_dir, "interactivetools_map.sqlite")))
self.interactivetool_prefix = kwargs.get("interactivetools_prefix", "interactivetool")
self.interactivetool_proxy_host = kwargs.get("interactivetool_proxy_host", None)
self.interactivetool_proxy_host = kwargs.get("interactivetools_proxy_host", None)

self.containers_conf = parse_containers_config(self.containers_config_file)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pbr==5.4.4
prettytable==0.7.2
prov==1.5.1
psutil==5.6.7
pulsar-galaxy-lib==0.14.0.dev1
pulsar-galaxy-lib==0.14.0.dev2
pyasn1-modules==0.2.7
pyasn1==0.4.8
pycparser==2.19
Expand Down
8 changes: 8 additions & 0 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,14 @@ def _setup_working_directory(self, job=None):
raise Exception('(%s) Unable to create job working directory',
job.id)

@property
def guest_ports(self):
if hasattr(self, "interactivetools"):
guest_ports = [ep.get('port') for ep in self.interactivetools]
return guest_ports
else:
return []

@property
def working_directory(self):
if self.__working_directory is None:
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def _find_container(
compute_tmp_directory = job_wrapper.tmp_directory()

tool = job_wrapper.tool
guest_ports = [ep.get('port') for ep in getattr(job_wrapper, 'interactivetools', [])]
guest_ports = job_wrapper.guest_ports
tool_info = ToolInfo(
tool.containers,
tool.requirements,
Expand Down
78 changes: 34 additions & 44 deletions lib/galaxy/jobs/runners/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
from galaxy.jobs.runners.util.pykube_util import (
DEFAULT_JOB_API_VERSION,
ensure_pykube,
find_job_object_by_name,
galaxy_instance_id,
Job,
job_object_dict,
Pod,
produce_unique_k8s_job_name,
pull_policy,
pykube_client_from_dict,
stop_job,
)
from galaxy.util.bytesize import ByteSize

Expand Down Expand Up @@ -201,25 +204,8 @@ def __get_fs_group(self):
return None

def __get_galaxy_instance_id(self):
"""
Gets the id of the Galaxy instance. This will be added to Jobs and Pods names, so it needs to be DNS friendly,
this means: `The Internet standards (Requests for Comments) for protocols mandate that component hostname labels
may contain only the ASCII letters 'a' through 'z' (in a case-insensitive manner), the digits '0' through '9',
and the minus sign ('-').`

It looks for the value set on self.runner_params['k8s_galaxy_instance_id'], which might or not be set. The
idea behind this is to allow the Galaxy instance to trust (or not) existing k8s Jobs and Pods that match the
setup of a Job that is being recovered or restarted after a downtime/reboot.
:return:
:rtype:
"""
if "k8s_galaxy_instance_id" in self.runner_params:
if re.match(r"(?!-)[a-z\d-]{1,20}(?<!-)$", self.runner_params['k8s_galaxy_instance_id']):
return self.runner_params['k8s_galaxy_instance_id']
else:
log.error("Galaxy instance '" + self.runner_params['k8s_galaxy_instance_id'] + "' is either too long "
+ '(>20 characters) or it includes non DNS acceptable characters, ignoring it.')
return None
"""Parse the ID of the Galaxy instance from runner params."""
return galaxy_instance_id(self.runner_params)

def __produce_unique_k8s_job_name(self, galaxy_internal_job_id):
# wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers.
Expand Down Expand Up @@ -279,9 +265,10 @@ def __get_k8s_containers(self, ajs):
Setting these variables changes the described behaviour in the job file shell script
used to execute the tool inside the container.
"""
container = self._find_container(ajs.job_wrapper)
k8s_container = {
"name": self.__get_k8s_container_name(ajs.job_wrapper),
"image": self._find_container(ajs.job_wrapper).container_id,
"image": container.container_id,
# this form of command overrides the entrypoint and allows multi command
# command line execution, separated by ;, which is what Galaxy does
# to assemble the command.
Expand All @@ -290,7 +277,9 @@ def __get_k8s_containers(self, ajs):
"workingDir": ajs.job_wrapper.working_directory,
"volumeMounts": self.runner_params['k8s_volume_mounts']
}

guest_ports = container.tool_info.guest_ports
if guest_ports:
k8s_container["ports"] = [{"containerPort": p} for p in guest_ports]
resources = self.__get_resources(ajs.job_wrapper)
if resources:
envs = []
Expand Down Expand Up @@ -457,6 +446,14 @@ def check_watched_item(self, job_state):
if not job_state.running:
job_state.running = True
job_state.job_wrapper.change_state(model.Job.states.RUNNING)
guest_ports = job_state.job_wrapper.guest_ports
if len(guest_ports) > 0:
pod = self._get_pod_for_job(job_state)
pod_ip = pod.obj['status']['podIP']
ports_dict = {}
for guest_port in guest_ports:
ports_dict[str(guest_port)] = dict(host=pod_ip, port=guest_port, protocol="http")
self.app.interactivetool_manager.configure_entry_points(job_state.job_wrapper.get_job(), ports_dict)
return job_state
elif job_state.job_wrapper.get_job().state == model.Job.states.DELETED:
# Job has been deleted via stop_job and job has not been deleted,
Expand Down Expand Up @@ -509,37 +506,31 @@ def _handle_job_failure(self, job, job_state):

def __cleanup_k8s_job(self, job):
k8s_cleanup_job = self.runner_params['k8s_cleanup_job']
job_failed = (job.obj['status']['failed'] > 0
if 'failed' in job.obj['status'] else False)
# Scale down the job just in case even if cleanup is never
job.scale(replicas=0)
if (k8s_cleanup_job == "always" or
(k8s_cleanup_job == "onsuccess" and not job_failed)):
delete_options = {
"apiVersion": "v1",
"kind": "DeleteOptions",
"propagationPolicy": "Background"
}
r = job.api.delete(json=delete_options, **job.api_kwargs())
job.api.raise_for_status(r)
stop_job(job, k8s_cleanup_job)

def __job_failed_due_to_walltime_limit(self, job):
conditions = job.obj['status'].get('conditions') or []
return any(True for c in conditions if c['type'] == 'Failed' and c['reason'] == 'DeadlineExceeded')

def _get_pod_for_job(self, job_state):
pods = Pod.objects(self._pykube_api).filter(selector="app=%s" % job_state.job_id,
namespace=self.runner_params['k8s_namespace'])
if not pods.response['items']:
return None

pod = Pod(self._pykube_api, pods.response['items'][0])
return pod

def __job_failed_due_to_low_memory(self, job_state):
"""
checks the state of the pod to see if it was killed
for being out of memory (pod status OOMKilled). If that is the case
marks the job for resubmission (resubmit logic is part of destinations).
"""

pods = Pod.objects(self._pykube_api).filter(selector="app=%s" % job_state.job_id,
namespace=self.runner_params['k8s_namespace'])
if not pods.response['items']:
pod = self._get_pod_for_job(job_state)
if pod is None:
return False

pod = Pod(self._pykube_api, pods.response['items'][0])
if pod.obj['status']['phase'] == "Failed" and \
pod.obj['status']['containerStatuses'][0]['state']['terminated']['reason'] == "OOMKilled":
return True
Expand All @@ -550,11 +541,10 @@ def stop_job(self, job_wrapper):
"""Attempts to delete a dispatched job to the k8s cluster"""
job = job_wrapper.get_job()
try:
jobs = Job.objects(self._pykube_api).filter(
selector="app=" + self.__produce_unique_k8s_job_name(job.get_id_tag()),
namespace=self.runner_params['k8s_namespace'])
if len(jobs.response['items']) > 0:
job_to_delete = Job(self._pykube_api, jobs.response['items'][0])
name = self.__produce_unique_k8s_job_name(job.get_id_tag())
namespace = self.runner_params['k8s_namespace']
job_to_delete = find_job_object_by_name(self._pykube_api, name, namespace)
if job_to_delete:
self.__cleanup_k8s_job(job_to_delete)
# TODO assert whether job parallelism == 0
# assert not job_to_delete.exists(), "Could not delete job,"+job.job_runner_external_id+" it still exists"
Expand Down
32 changes: 28 additions & 4 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class PulsarJobRunner(AsynchronousJobRunner):
runner_name = "PulsarJobRunner"
default_build_pulsar_app = False
use_mq = False
poll = True

def __init__(self, app, nworkers, **kwds):
"""Start the job runner."""
Expand All @@ -207,12 +208,13 @@ def _monitor(self):
if self.use_mq:
# This is a message queue driven runner, don't monitor
# just setup required callback.
self._init_noop_monitor()

self.client_manager.ensure_has_status_update_callback(self.__async_update)
self.client_manager.ensure_has_ack_consumers()
else:

if self.poll:
self._init_monitor_thread()
else:
self._init_noop_monitor()

def __init_client_manager(self):
pulsar_conf = self.runner_params.get('pulsar_app_config', None)
Expand Down Expand Up @@ -262,6 +264,23 @@ def url_to_destination(self, url):
return JobDestination(runner="pulsar", params=url_to_destination_params(url))

def check_watched_item(self, job_state):
if self.use_mq:
# Might still need to check pod IPs.
job_wrapper = job_state.job_wrapper
guest_ports = job_wrapper.guest_ports
if len(guest_ports) > 0:
client = self.get_client_from_state(job_state)
job_ip = client.job_ip()
if job_ip:
ports_dict = {}
for guest_port in guest_ports:
ports_dict[str(guest_port)] = dict(host=job_ip, port=guest_port, protocol="http")
self.app.interactivetool_manager.configure_entry_points(job_wrapper.get_job(), ports_dict)
return job_state
else:
return self.check_watched_item_state(job_state)

def check_watched_item_state(self, job_state):
try:
client = self.get_client_from_state(job_state)
status = client.get_status()
Expand All @@ -277,6 +296,7 @@ def check_watched_item(self, job_state):
return job_state

def _update_job_state_for_status(self, job_state, pulsar_status, full_status=None):
log.info("in _update_job_state_for_status...")
if pulsar_status == "complete":
self.mark_as_finished(job_state)
return None
Expand Down Expand Up @@ -366,6 +386,7 @@ def queue_job(self, job_wrapper):
remote_pulsar_app_config=remote_pulsar_app_config,
job_directory_files=job_directory_files,
container=None if not remote_container else remote_container.container_id,
guest_ports=job_wrapper.guest_ports,
)
job_id = pulsar_submit_job(client, client_job_description, remote_job_config)
log.info("Pulsar job submitted with job_id %s" % job_id)
Expand Down Expand Up @@ -823,6 +844,7 @@ def __build_metadata_configuration(self, client, job_wrapper, remote_metadata, r
return metadata_kwds

def __async_update(self, full_status):
log.info("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n IN AN UPDATE \n\n\n\n\n\n\n\n\n\n")
galaxy_job_id = None
try:
remote_job_id = full_status["job_id"]
Expand Down Expand Up @@ -853,6 +875,7 @@ class PulsarLegacyJobRunner(PulsarJobRunner):
class PulsarMQJobRunner(PulsarJobRunner):
"""Flavor of Pulsar job runner with sensible defaults for message queue communication."""
use_mq = True
poll = False

destination_defaults = dict(
default_file_action="remote_transfer",
Expand All @@ -868,7 +891,7 @@ class PulsarMQJobRunner(PulsarJobRunner):
"default_file_action": "remote_transfer",
"rewrite_parameters": "true",
"jobs_directory": "/pulsar_staging",
"pulsar_container_image": "galaxy/pulsar-pod-staging:0.13.0",
"pulsar_container_image": "galaxy/pulsar-pod-staging:0.14.0",
"remote_container_handling": True,
"k8s_enabled": True,
"url": PARAMETER_SPECIFICATION_IGNORED,
Expand All @@ -878,6 +901,7 @@ class PulsarMQJobRunner(PulsarJobRunner):

class PulsarKubernetesJobRunner(PulsarMQJobRunner):
destination_defaults = KUBERNETES_DESTINATION_DEFAULTS
poll = True # Poll so we can check API for pod IP for ITs.

def _populate_parameter_defaults(self, job_destination):
super(PulsarKubernetesJobRunner, self)._populate_parameter_defaults(job_destination)
Expand Down
Loading