Skip to content

Commit

Permalink
Merge pull request #16384 from mvdbeek/job_cache_fixes_for_dces
Browse files Browse the repository at this point in the history
[23.1] Job cache fixes for DCEs
  • Loading branch information
jdavcs authored Jul 19, 2023
2 parents c366464 + f76b152 commit 53c6fe8
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 55 deletions.
130 changes: 77 additions & 53 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
from galaxy.managers.datasets import DatasetManager
from galaxy.managers.hdas import HDAManager
from galaxy.managers.lddas import LDDAManager
from galaxy.model import (
Job,
JobParameter,
)
from galaxy.model.base import transaction
from galaxy.model.index_filter_util import (
raw_text_column_filter,
Expand Down Expand Up @@ -325,37 +329,37 @@ def replace_dataset_ids(path, key, value):
return key, value
return key, value

job_conditions = [
# build one subquery that selects a job with correct job parameters

subq = select([model.Job.id]).where(
and_(
model.Job.tool_id == tool_id,
model.Job.user == user,
model.Job.user_id == user.id,
model.Job.copied_from_job_id.is_(None), # Always pick original job
)
]

)
if tool_version:
job_conditions.append(model.Job.tool_version == str(tool_version))
subq = subq.where(Job.tool_version == str(tool_version))

if job_state is None:
job_conditions.append(
model.Job.state.in_(
[
model.Job.states.NEW,
model.Job.states.QUEUED,
model.Job.states.WAITING,
model.Job.states.RUNNING,
model.Job.states.OK,
]
subq = subq.where(
Job.state.in_(
[Job.states.NEW, Job.states.QUEUED, Job.states.WAITING, Job.states.RUNNING, Job.states.OK]
)
)
else:
if isinstance(job_state, str):
job_conditions.append(model.Job.state == job_state)
subq = subq.where(Job.state == job_state)
elif isinstance(job_state, list):
o = []
for s in job_state:
o.append(model.Job.state == s)
job_conditions.append(or_(*o))
subq = subq.where(or_(*[Job.state == s for s in job_state]))

# exclude jobs with deleted outputs
subq = subq.where(
and_(
model.Job.any_output_dataset_collection_instances_deleted == false(),
model.Job.any_output_dataset_deleted == false(),
)
)

for k, v in wildcard_param_dump.items():
wildcard_value = None
Expand All @@ -371,26 +375,26 @@ def replace_dataset_ids(path, key, value):
if not wildcard_value:
value_dump = json.dumps(v, sort_keys=True)
wildcard_value = value_dump.replace('"id": "__id_wildcard__"', '"id": %')
a = aliased(model.JobParameter)
a = aliased(JobParameter)
if value_dump == wildcard_value:
job_conditions.append(
subq = subq.join(a).where(
and_(
model.Job.id == a.job_id,
Job.id == a.job_id,
a.name == k,
a.value == value_dump,
)
)
else:
job_conditions.append(and_(model.Job.id == a.job_id, a.name == k, a.value.like(wildcard_value)))
subq = subq.join(a).where(
and_(
Job.id == a.job_id,
a.name == k,
a.value.like(wildcard_value),
)
)

job_conditions.append(
and_(
model.Job.any_output_dataset_collection_instances_deleted == false(),
model.Job.any_output_dataset_deleted == false(),
)
)
query = select([Job.id]).select_from(Job.table.join(subq, subq.c.id == Job.id))

subq = self.sa_session.query(model.Job.id).filter(*job_conditions).subquery()
data_conditions = []

# We now build the query filters that relate to the input datasets
Expand All @@ -416,14 +420,19 @@ def replace_dataset_ids(path, key, value):
c = aliased(model.HistoryDatasetAssociation)
d = aliased(model.JobParameter)
e = aliased(model.HistoryDatasetAssociationHistory)
query.add_columns(a.dataset_id)
used_ids.append(a.dataset_id)
query = query.join(a, a.job_id == model.Job.id)
stmt = select([model.HistoryDatasetAssociation.id]).where(
model.HistoryDatasetAssociation.id == e.history_dataset_association_id
)
# b is the HDA used for the job
query = query.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id)
name_condition = []
if identifier:
query = query.join(d)
data_conditions.append(
and_(
model.Job.id == d.job_id,
d.name.in_({f"{_}|__identifier__" for _ in k}),
d.value == json.dumps(identifier),
)
Expand All @@ -444,10 +453,7 @@ def replace_dataset_ids(path, key, value):
)
data_conditions.append(
and_(
a.job_id == model.Job.id,
a.name.in_(k),
a.dataset_id == b.id, # b is the HDA used for the job
c.dataset_id == b.dataset_id,
c.id == v, # c is the requested job input HDA
# We need to make sure that the job we are looking for has been run with identical inputs.
# Here we deal with 3 requirements:
Expand All @@ -466,23 +472,26 @@ def replace_dataset_ids(path, key, value):
or_(b.deleted == false(), c.deleted == false()),
)
)

used_ids.append(a.dataset_id)
elif t == "ldda":
a = aliased(model.JobToInputLibraryDatasetAssociation)
data_conditions.append(and_(model.Job.id == a.job_id, a.name.in_(k), a.ldda_id == v))
query = query.add_columns(a.ldda_id)
query = query.join(a, a.job_id == model.Job.id)
data_conditions.append(and_(a.name.in_(k), a.ldda_id == v))
used_ids.append(a.ldda_id)
elif t == "hdca":
a = aliased(model.JobToInputDatasetCollectionAssociation)
b = aliased(model.HistoryDatasetCollectionAssociation)
c = aliased(model.HistoryDatasetCollectionAssociation)
query = query.add_columns(a.dataset_collection_id)
query = (
query.join(a, a.job_id == model.Job.id)
.join(b, b.id == a.dataset_collection_id)
.join(c, b.name == c.name)
)
data_conditions.append(
and_(
model.Job.id == a.job_id,
a.name.in_(k),
b.id == a.dataset_collection_id,
c.id == v,
b.name == c.name,
or_(
and_(b.deleted == false(), b.id == v),
and_(
Expand All @@ -500,28 +509,43 @@ def replace_dataset_ids(path, key, value):
a = aliased(model.JobToInputDatasetCollectionElementAssociation)
b = aliased(model.DatasetCollectionElement)
c = aliased(model.DatasetCollectionElement)
d = aliased(model.HistoryDatasetAssociation)
e = aliased(model.HistoryDatasetAssociation)
query = query.add_columns(a.dataset_collection_element_id)
query = (
query.join(a)
.join(b, b.id == a.dataset_collection_element_id)
.join(
c,
and_(
c.element_identifier == b.element_identifier,
or_(c.hda_id == b.hda_id, c.child_collection_id == b.child_collection_id),
),
)
.outerjoin(d, d.id == c.hda_id)
.outerjoin(e, e.dataset_id == d.dataset_id)
)
data_conditions.append(
and_(
model.Job.id == a.job_id,
a.name.in_(k),
a.dataset_collection_element_id == b.id,
b.element_identifier == c.element_identifier,
c.child_collection_id == b.child_collection_id,
or_(
c.child_collection_id == b.child_collection_id,
and_(
c.hda_id == b.hda_id,
d.id == c.hda_id,
e.dataset_id == d.dataset_id,
),
),
c.id == v,
)
)
used_ids.append(a.dataset_collection_element_id)
else:
return []

query = (
self.sa_session.query(model.Job.id, *used_ids)
.join(subq, model.Job.id == subq.c.id)
.filter(*data_conditions)
.group_by(model.Job.id, *used_ids)
.order_by(model.Job.id.desc())
)
for job in query:
query = query.where(*data_conditions).group_by(model.Job.id, *used_ids).order_by(model.Job.id.desc())

for job in self.sa_session.execute(query):
# We found a job that is equal in terms of tool_id, user, state and input datasets,
# but to be able to verify that the parameters match we need to modify all instances of
# dataset_ids (HDA, LDDA, HDCA) in the incoming param_dump to point to those used by the
Expand Down Expand Up @@ -556,7 +580,7 @@ def replace_dataset_ids(path, key, value):
and_(model.Job.id == a.job_id, a.name == k, a.value == json.dumps(v, sort_keys=True))
)
else:
job_parameter_conditions = [model.Job.id == job]
job_parameter_conditions = [model.Job.id == job[0]]
query = self.sa_session.query(model.Job).filter(*job_parameter_conditions)
job = query.first()
if job is None:
Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,7 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable):
"create_time",
"galaxy_version",
"command_version",
"copied_from_job_id",
]

_numeric_metric = JobMetricNumeric
Expand Down Expand Up @@ -10669,7 +10670,7 @@ class CleanupEventImplicitlyConvertedDatasetAssociationAssociation(Base):
)

Job.any_output_dataset_deleted = column_property(
exists(HistoryDatasetAssociation).where(
exists(HistoryDatasetAssociation.id).where(
and_(
Job.id == JobToOutputDatasetAssociation.job_id,
HistoryDatasetAssociation.table.c.id == JobToOutputDatasetAssociation.dataset_id,
Expand Down
29 changes: 29 additions & 0 deletions lib/galaxy/tools/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,24 @@ def finalize_dataset_collections(self, trans):
)
trans.sa_session.add(implicit_collection.collection)
else:
completed_collections = {}
if (
self.completed_jobs
and self.implicit_collection_jobs
and len(self.completed_jobs) == len(self.successful_jobs)
):
# If the same number of implicit collection jobs went into
# creating the collection and those jobs are all cached
# the HDCA has effectively been copied.
# We mark this here so that the job cache query in subsequent
# jobs considers this to be a valid cached input.
completed_job_ids = {job.id for job in self.completed_jobs.values() if job}
if all(job.copied_from_job_id in completed_job_ids for job in self.implicit_collection_jobs.job_list):
completed_collections = {
jtodca.name: jtodca.dataset_collection_instance
for jtodca in self.completed_jobs[0].output_dataset_collection_instances
}
implicit_collection = None
for i, implicit_collection in enumerate(self.implicit_collections.values()):
if i == 0:
implicit_collection_jobs = implicit_collection.implicit_collection_jobs
Expand All @@ -465,7 +483,18 @@ def finalize_dataset_collections(self, trans):
implicit_collection.collection.finalize(
collection_type_description=self.collection_info.structure.collection_type_description
)

# Mark implicit HDCA as copied
completed_implicit_collection = implicit_collection and completed_collections.get(
implicit_collection.implicit_output_name
)
if completed_implicit_collection:
implicit_collection.copied_from_history_dataset_collection_association_id = (
completed_implicit_collection.id
)

trans.sa_session.add(implicit_collection.collection)

with transaction(trans.sa_session):
trans.sa_session.commit()

Expand Down
42 changes: 42 additions & 0 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4548,6 +4548,48 @@ def test_workflow_rerun_with_use_cached_job(self):
first_wf_output["file_name"] == second_wf_output["file_name"]
), f"first output:\n{first_wf_output}\nsecond output:\n{second_wf_output}"

@skip_without_tool("cat1")
@skip_without_tool("identifier_multiple")
def test_workflow_rerun_with_cached_job_consumes_implicit_hdca(self, history_id: str):
workflow = """
class: GalaxyWorkflow
inputs:
collection_input:
type: data_collection_input
steps:
map_over:
tool_id: cat1
in:
input1: collection_input
consume_hdca:
tool_id: identifier_multiple
in:
input1: map_over/out_file1
"""
workflow_id = self.workflow_populator.upload_yaml_workflow(name="Consume HDCA", yaml_content=workflow)
hdca1 = self.dataset_collection_populator.create_list_in_history(
history_id, contents=[("sample1-1", "1 2 3"), ("sample2-1", "7 8 9")]
).json()
hdca1 = self.dataset_collection_populator.wait_for_fetched_collection(hdca1)
workflow_request = {
"inputs": json.dumps({"collection_input": self._ds_entry(hdca1)}),
"history": f"hist_id={history_id}",
"use_cached_job": True,
"inputs_by": "name",
}
first_invocation_summary = self.workflow_populator.invoke_workflow_and_wait(
workflow_id, request=workflow_request
).json()
first_invocation = self.workflow_populator.get_invocation(first_invocation_summary["id"], step_details=True)
final_job_id_first_invocation = first_invocation["steps"][2]["jobs"][0]["id"]
second_invocation_summary = self.workflow_populator.invoke_workflow_and_wait(
workflow_id, request=workflow_request
).json()
second_invocation = self.workflow_populator.get_invocation(second_invocation_summary["id"], step_details=True)
final_job_id_second_invocation = second_invocation["steps"][2]["jobs"][0]["id"]
final_job = self.dataset_populator.get_job_details(final_job_id_second_invocation, full=True).json()
assert final_job["copied_from_job_id"] == final_job_id_first_invocation

@skip_without_tool("cat1")
def test_nested_workflow_rerun_with_use_cached_job(self):
with self.dataset_populator.test_history() as history_id_one, self.dataset_populator.test_history() as history_id_two:
Expand Down
8 changes: 7 additions & 1 deletion test/unit/app/test_remote_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
try:
import mockssh
except ImportError:
raise unittest.SkipTest("Skipping tests that require mockssh")
mockssh = None

from galaxy.jobs.runners.util.cli import CliInterface
from galaxy.security.ssh_util import (
Expand Down Expand Up @@ -42,6 +42,8 @@ def setUpClass(cls):
cls.cli_interface = CliInterface()

def test_secure_shell_plugin_without_strict(self):
if not mockssh:
raise unittest.SkipTest("Skipping tests that require mockssh")
with mockssh.Server(users={self.username: self.ssh_keys.private_key_file}) as server:
self.shell_params["port"] = server.port
self.shell_params["plugin"] = "SecureShell"
Expand All @@ -51,13 +53,17 @@ def test_secure_shell_plugin_without_strict(self):
assert result.stdout.strip() == "hello"

def test_get_shell_plugin(self):
if not mockssh:
raise unittest.SkipTest("Skipping tests that require mockssh")
with mockssh.Server(users={self.username: self.ssh_keys.private_key_file}) as server:
self.shell_params["port"] = server.port
self.shell_params["plugin"] = "ParamikoShell"
self.shell = self.cli_interface.get_shell_plugin(self.shell_params)
assert self.shell.username == self.username

def test_paramiko_shell_plugin(self):
if not mockssh:
raise unittest.SkipTest("Skipping tests that require mockssh")
with mockssh.Server(users={self.username: self.ssh_keys.private_key_file}) as server:
self.shell_params["port"] = server.port
self.shell_params["plugin"] = "ParamikoShell"
Expand Down

0 comments on commit 53c6fe8

Please sign in to comment.