diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 54c01bcea4aa..30040843e2d5 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -35,6 +35,10 @@ from galaxy.managers.datasets import DatasetManager from galaxy.managers.hdas import HDAManager from galaxy.managers.lddas import LDDAManager +from galaxy.model import ( + Job, + JobParameter, +) from galaxy.model.base import transaction from galaxy.model.index_filter_util import ( raw_text_column_filter, @@ -325,37 +329,37 @@ def replace_dataset_ids(path, key, value): return key, value return key, value - job_conditions = [ + # build one subquery that selects a job with correct job parameters + + subq = select([model.Job.id]).where( and_( model.Job.tool_id == tool_id, - model.Job.user == user, + model.Job.user_id == user.id, model.Job.copied_from_job_id.is_(None), # Always pick original job ) - ] - + ) if tool_version: - job_conditions.append(model.Job.tool_version == str(tool_version)) + subq = subq.where(Job.tool_version == str(tool_version)) if job_state is None: - job_conditions.append( - model.Job.state.in_( - [ - model.Job.states.NEW, - model.Job.states.QUEUED, - model.Job.states.WAITING, - model.Job.states.RUNNING, - model.Job.states.OK, - ] + subq = subq.where( + Job.state.in_( + [Job.states.NEW, Job.states.QUEUED, Job.states.WAITING, Job.states.RUNNING, Job.states.OK] ) ) else: if isinstance(job_state, str): - job_conditions.append(model.Job.state == job_state) + subq = subq.where(Job.state == job_state) elif isinstance(job_state, list): - o = [] - for s in job_state: - o.append(model.Job.state == s) - job_conditions.append(or_(*o)) + subq = subq.where(or_(*[Job.state == s for s in job_state])) + + # exclude jobs with deleted outputs + subq = subq.where( + and_( + model.Job.any_output_dataset_collection_instances_deleted == false(), + model.Job.any_output_dataset_deleted == false(), + ) + ) for k, v in wildcard_param_dump.items(): wildcard_value = None @@ -371,26 +375,26 @@ def replace_dataset_ids(path, key, value): if not wildcard_value: value_dump = json.dumps(v, sort_keys=True) wildcard_value = value_dump.replace('"id": "__id_wildcard__"', '"id": %') - a = aliased(model.JobParameter) + a = aliased(JobParameter) if value_dump == wildcard_value: - job_conditions.append( + subq = subq.join(a).where( and_( - model.Job.id == a.job_id, + Job.id == a.job_id, a.name == k, a.value == value_dump, ) ) else: - job_conditions.append(and_(model.Job.id == a.job_id, a.name == k, a.value.like(wildcard_value))) + subq = subq.join(a).where( + and_( + Job.id == a.job_id, + a.name == k, + a.value.like(wildcard_value), + ) + ) - job_conditions.append( - and_( - model.Job.any_output_dataset_collection_instances_deleted == false(), - model.Job.any_output_dataset_deleted == false(), - ) - ) + query = select([Job.id]).select_from(Job.table.join(subq, subq.c.id == Job.id)) - subq = self.sa_session.query(model.Job.id).filter(*job_conditions).subquery() data_conditions = [] # We now build the query filters that relate to the input datasets @@ -416,14 +420,19 @@ def replace_dataset_ids(path, key, value): c = aliased(model.HistoryDatasetAssociation) d = aliased(model.JobParameter) e = aliased(model.HistoryDatasetAssociationHistory) + query.add_columns(a.dataset_id) + used_ids.append(a.dataset_id) + query = query.join(a, a.job_id == model.Job.id) stmt = select([model.HistoryDatasetAssociation.id]).where( model.HistoryDatasetAssociation.id == e.history_dataset_association_id ) + # b is the HDA used for the job + query = query.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id) name_condition = [] if identifier: + query = query.join(d) data_conditions.append( and_( - model.Job.id == d.job_id, d.name.in_({f"{_}|__identifier__" for _ in k}), d.value == json.dumps(identifier), ) @@ -444,10 +453,7 @@ def replace_dataset_ids(path, key, value): ) data_conditions.append( and_( - a.job_id == model.Job.id, a.name.in_(k), - a.dataset_id == b.id, # b is the HDA used for the job - c.dataset_id == b.dataset_id, c.id == v, # c is the requested job input HDA # We need to make sure that the job we are looking for has been run with identical inputs. # Here we deal with 3 requirements: @@ -466,23 +472,26 @@ def replace_dataset_ids(path, key, value): or_(b.deleted == false(), c.deleted == false()), ) ) - - used_ids.append(a.dataset_id) elif t == "ldda": a = aliased(model.JobToInputLibraryDatasetAssociation) - data_conditions.append(and_(model.Job.id == a.job_id, a.name.in_(k), a.ldda_id == v)) + query = query.add_columns(a.ldda_id) + query = query.join(a, a.job_id == model.Job.id) + data_conditions.append(and_(a.name.in_(k), a.ldda_id == v)) used_ids.append(a.ldda_id) elif t == "hdca": a = aliased(model.JobToInputDatasetCollectionAssociation) b = aliased(model.HistoryDatasetCollectionAssociation) c = aliased(model.HistoryDatasetCollectionAssociation) + query = query.add_columns(a.dataset_collection_id) + query = ( + query.join(a, a.job_id == model.Job.id) + .join(b, b.id == a.dataset_collection_id) + .join(c, b.name == c.name) + ) data_conditions.append( and_( - model.Job.id == a.job_id, a.name.in_(k), - b.id == a.dataset_collection_id, c.id == v, - b.name == c.name, or_( and_(b.deleted == false(), b.id == v), and_( @@ -500,13 +509,33 @@ def replace_dataset_ids(path, key, value): a = aliased(model.JobToInputDatasetCollectionElementAssociation) b = aliased(model.DatasetCollectionElement) c = aliased(model.DatasetCollectionElement) + d = aliased(model.HistoryDatasetAssociation) + e = aliased(model.HistoryDatasetAssociation) + query = query.add_columns(a.dataset_collection_element_id) + query = ( + query.join(a) + .join(b, b.id == a.dataset_collection_element_id) + .join( + c, + and_( + c.element_identifier == b.element_identifier, + or_(c.hda_id == b.hda_id, c.child_collection_id == b.child_collection_id), + ), + ) + .outerjoin(d, d.id == c.hda_id) + .outerjoin(e, e.dataset_id == d.dataset_id) + ) data_conditions.append( and_( - model.Job.id == a.job_id, a.name.in_(k), - a.dataset_collection_element_id == b.id, - b.element_identifier == c.element_identifier, - c.child_collection_id == b.child_collection_id, + or_( + c.child_collection_id == b.child_collection_id, + and_( + c.hda_id == b.hda_id, + d.id == c.hda_id, + e.dataset_id == d.dataset_id, + ), + ), c.id == v, ) ) @@ -514,14 +543,9 @@ def replace_dataset_ids(path, key, value): else: return [] - query = ( - self.sa_session.query(model.Job.id, *used_ids) - .join(subq, model.Job.id == subq.c.id) - .filter(*data_conditions) - .group_by(model.Job.id, *used_ids) - .order_by(model.Job.id.desc()) - ) - for job in query: + query = query.where(*data_conditions).group_by(model.Job.id, *used_ids).order_by(model.Job.id.desc()) + + for job in self.sa_session.execute(query): # We found a job that is equal in terms of tool_id, user, state and input datasets, # but to be able to verify that the parameters match we need to modify all instances of # dataset_ids (HDA, LDDA, HDCA) in the incoming param_dump to point to those used by the @@ -556,7 +580,7 @@ def replace_dataset_ids(path, key, value): and_(model.Job.id == a.job_id, a.name == k, a.value == json.dumps(v, sort_keys=True)) ) else: - job_parameter_conditions = [model.Job.id == job] + job_parameter_conditions = [model.Job.id == job[0]] query = self.sa_session.query(model.Job).filter(*job_parameter_conditions) job = query.first() if job is None: diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index ddd59841053d..99e41676757e 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1361,6 +1361,7 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable): "create_time", "galaxy_version", "command_version", + "copied_from_job_id", ] _numeric_metric = JobMetricNumeric @@ -10669,7 +10670,7 @@ class CleanupEventImplicitlyConvertedDatasetAssociationAssociation(Base): ) Job.any_output_dataset_deleted = column_property( - exists(HistoryDatasetAssociation).where( + exists(HistoryDatasetAssociation.id).where( and_( Job.id == JobToOutputDatasetAssociation.job_id, HistoryDatasetAssociation.table.c.id == JobToOutputDatasetAssociation.dataset_id, diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index c69b13dae117..412c36bcefdd 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -457,6 +457,24 @@ def finalize_dataset_collections(self, trans): ) trans.sa_session.add(implicit_collection.collection) else: + completed_collections = {} + if ( + self.completed_jobs + and self.implicit_collection_jobs + and len(self.completed_jobs) == len(self.successful_jobs) + ): + # If the same number of implicit collection jobs went into + # creating the collection and those jobs are all cached + # the HDCA has effectively been copied. + # We mark this here so that the job cache query in subsequent + # jobs considers this to be a valid cached input. + completed_job_ids = {job.id for job in self.completed_jobs.values() if job} + if all(job.copied_from_job_id in completed_job_ids for job in self.implicit_collection_jobs.job_list): + completed_collections = { + jtodca.name: jtodca.dataset_collection_instance + for jtodca in self.completed_jobs[0].output_dataset_collection_instances + } + implicit_collection = None for i, implicit_collection in enumerate(self.implicit_collections.values()): if i == 0: implicit_collection_jobs = implicit_collection.implicit_collection_jobs @@ -465,7 +483,18 @@ def finalize_dataset_collections(self, trans): implicit_collection.collection.finalize( collection_type_description=self.collection_info.structure.collection_type_description ) + + # Mark implicit HDCA as copied + completed_implicit_collection = implicit_collection and completed_collections.get( + implicit_collection.implicit_output_name + ) + if completed_implicit_collection: + implicit_collection.copied_from_history_dataset_collection_association_id = ( + completed_implicit_collection.id + ) + trans.sa_session.add(implicit_collection.collection) + with transaction(trans.sa_session): trans.sa_session.commit() diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 1a68c00c1ce3..7cc9849dc947 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -4548,6 +4548,48 @@ def test_workflow_rerun_with_use_cached_job(self): first_wf_output["file_name"] == second_wf_output["file_name"] ), f"first output:\n{first_wf_output}\nsecond output:\n{second_wf_output}" + @skip_without_tool("cat1") + @skip_without_tool("identifier_multiple") + def test_workflow_rerun_with_cached_job_consumes_implicit_hdca(self, history_id: str): + workflow = """ +class: GalaxyWorkflow +inputs: + collection_input: + type: data_collection_input +steps: + map_over: + tool_id: cat1 + in: + input1: collection_input + consume_hdca: + tool_id: identifier_multiple + in: + input1: map_over/out_file1 +""" + workflow_id = self.workflow_populator.upload_yaml_workflow(name="Consume HDCA", yaml_content=workflow) + hdca1 = self.dataset_collection_populator.create_list_in_history( + history_id, contents=[("sample1-1", "1 2 3"), ("sample2-1", "7 8 9")] + ).json() + hdca1 = self.dataset_collection_populator.wait_for_fetched_collection(hdca1) + workflow_request = { + "inputs": json.dumps({"collection_input": self._ds_entry(hdca1)}), + "history": f"hist_id={history_id}", + "use_cached_job": True, + "inputs_by": "name", + } + first_invocation_summary = self.workflow_populator.invoke_workflow_and_wait( + workflow_id, request=workflow_request + ).json() + first_invocation = self.workflow_populator.get_invocation(first_invocation_summary["id"], step_details=True) + final_job_id_first_invocation = first_invocation["steps"][2]["jobs"][0]["id"] + second_invocation_summary = self.workflow_populator.invoke_workflow_and_wait( + workflow_id, request=workflow_request + ).json() + second_invocation = self.workflow_populator.get_invocation(second_invocation_summary["id"], step_details=True) + final_job_id_second_invocation = second_invocation["steps"][2]["jobs"][0]["id"] + final_job = self.dataset_populator.get_job_details(final_job_id_second_invocation, full=True).json() + assert final_job["copied_from_job_id"] == final_job_id_first_invocation + @skip_without_tool("cat1") def test_nested_workflow_rerun_with_use_cached_job(self): with self.dataset_populator.test_history() as history_id_one, self.dataset_populator.test_history() as history_id_two: diff --git a/test/unit/app/test_remote_shell.py b/test/unit/app/test_remote_shell.py index 378581ba1e02..3bb83c1b678e 100644 --- a/test/unit/app/test_remote_shell.py +++ b/test/unit/app/test_remote_shell.py @@ -8,7 +8,7 @@ try: import mockssh except ImportError: - raise unittest.SkipTest("Skipping tests that require mockssh") + mockssh = None from galaxy.jobs.runners.util.cli import CliInterface from galaxy.security.ssh_util import ( @@ -42,6 +42,8 @@ def setUpClass(cls): cls.cli_interface = CliInterface() def test_secure_shell_plugin_without_strict(self): + if not mockssh: + raise unittest.SkipTest("Skipping tests that require mockssh") with mockssh.Server(users={self.username: self.ssh_keys.private_key_file}) as server: self.shell_params["port"] = server.port self.shell_params["plugin"] = "SecureShell" @@ -51,6 +53,8 @@ def test_secure_shell_plugin_without_strict(self): assert result.stdout.strip() == "hello" def test_get_shell_plugin(self): + if not mockssh: + raise unittest.SkipTest("Skipping tests that require mockssh") with mockssh.Server(users={self.username: self.ssh_keys.private_key_file}) as server: self.shell_params["port"] = server.port self.shell_params["plugin"] = "ParamikoShell" @@ -58,6 +62,8 @@ def test_get_shell_plugin(self): assert self.shell.username == self.username def test_paramiko_shell_plugin(self): + if not mockssh: + raise unittest.SkipTest("Skipping tests that require mockssh") with mockssh.Server(users={self.username: self.ssh_keys.private_key_file}) as server: self.shell_params["port"] = server.port self.shell_params["plugin"] = "ParamikoShell"