- 
                Notifications
    You must be signed in to change notification settings 
- Fork 79
Analysis refactor gui part7 #2117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 126 commits
3601c29
              0d6788e
              12406cc
              958fcbe
              a57ef23
              2ead7a6
              73a78e7
              e64a22a
              0dcae8b
              4a5bbbc
              f99975c
              ed899a8
              d508320
              025cc1e
              599bcde
              d12ccfe
              b33983b
              b4f1b1f
              62a1b93
              2e36141
              e006e20
              c174693
              131dd6a
              ccb55bd
              dfe2e83
              15fcceb
              7f97f2a
              9eb9dbb
              23104d7
              1f1e826
              19a9dda
              19889f9
              4e380e0
              6f0dd71
              ed9fc65
              4b19b45
              d9b41e8
              5ef06ae
              5e3504a
              d10096a
              661342f
              fcd249b
              f3c1216
              a91a6fd
              7b9fa6f
              33bcbe5
              5e4bd9b
              8bf3d6e
              7807bac
              6360675
              811b7a7
              751d4ad
              65a86df
              b1817dd
              18d77e1
              01c656c
              53188a6
              1ab4e3b
              1e8332e
              cb67d3d
              5a5127d
              0033480
              3e3f6e1
              6a20c1b
              a1b3c90
              3809ad5
              067f14f
              cf4862d
              3b07151
              a6595a9
              6343b49
              a3505c2
              c8113ea
              f4835d5
              f731768
              7542658
              e0180e8
              f55ca5c
              1fa4b19
              b61ae87
              bb68303
              b31a025
              378d7ff
              444da08
              f6b4c46
              e9d3af3
              69b6412
              60cd430
              be099cb
              819e9a5
              e941fa7
              d6ebcb4
              6ada2ba
              7d70a38
              e8ca9db
              4bf4808
              aa68a21
              0c6ffa7
              586660b
              6cdc574
              7bae13e
              cf801a4
              7c2454e
              c2eb6ae
              aeeac62
              2795046
              a77b040
              ff9eda9
              6145976
              7153efb
              46dd73d
              08eafaa
              b1a3e99
              c9580b7
              85d4aa7
              1ffa231
              9e14cc6
              4cd34d2
              bf33527
              3529556
              ca5a331
              b934d68
              fa00d60
              b2ac959
              0326a63
              cd6b61c
              cccb1d4
              0a584f3
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| # ----------------------------------------------------------------------------- | ||
| # Copyright (c) 2014--, The Qiita Development Team. | ||
| # | ||
| # Distributed under the terms of the BSD 3-clause License. | ||
| # | ||
| # The full license is in the file LICENSE, distributed with this software. | ||
| # ----------------------------------------------------------------------------- | ||
|  | ||
| from json import dumps | ||
| from sys import exc_info | ||
| from time import sleep | ||
| import traceback | ||
|  | ||
| import qiita_db as qdb | ||
|  | ||
|  | ||
| def build_analysis_files(job): | ||
| """Builds the files for an analysis | ||
| Parameters | ||
| ---------- | ||
| job : qiita_db.processing_job.ProcessingJob | ||
| The processing job with the information for building the files | ||
| """ | ||
| with qdb.sql_connection.TRN: | ||
| params = job.parameters.values | ||
| analysis_id = params['analysis'] | ||
| merge_duplicated_sample_ids = params['merge_dup_sample_ids'] | ||
| analysis = qdb.analysis.Analysis(analysis_id) | ||
| biom_files = analysis.build_files(merge_duplicated_sample_ids) | ||
|  | ||
| cmd = qdb.software.Command.get_validator('BIOM') | ||
| val_jobs = [] | ||
| for dtype, biom_fp in biom_files: | ||
| validate_params = qdb.software.Parameters.load( | ||
| cmd, values_dict={'files': dumps({'biom': [biom_fp]}), | ||
| 'artifact_type': 'BIOM', | ||
| 'provenance': dumps({'job': job.id, | ||
| 'data_type': dtype}), | ||
| 'analysis': analysis_id}) | ||
| val_jobs.append(qdb.processing_job.ProcessingJob.create( | ||
| analysis.owner, validate_params)) | ||
|  | ||
| job._set_validator_jobs(val_jobs) | ||
|  | ||
| for j in val_jobs: | ||
| j.submit() | ||
| sleep(1) | ||
|  | ||
|  | ||
| TASK_DICT = {'build_analysis_files': build_analysis_files} | ||
|  | ||
|  | ||
| def private_task(job_id): | ||
| """Complets a Qiita private task | ||
| Parameters | ||
| ---------- | ||
| job_id : str | ||
| The job id | ||
| """ | ||
| if job_id == 'register': | ||
| # We don't need to do anything here if Qiita is registering plugins | ||
| return | ||
|  | ||
| job = qdb.processing_job.ProcessingJob(job_id) | ||
| job.update_heartbeat_state() | ||
|  | ||
| try: | ||
| task_name = job.command.name | ||
|          | ||
| TASK_DICT[task_name](job) | ||
| except Exception: | ||
| job.complete(False, error="Error executing private task: %s" | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens if the job does not except, shouldn't there be a  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessarily, as it depends on the actual private task being executed (right now there is only one type of private tasks, but we will be adding more in the future). Let's say that we have job A that generates one output O. The output O needs to be validated by a different job B. The way it is currently done is that job A is marked as 'running' until the validator job B has completed. This way, if the validator fails because the output is not correct, the job A is marked as failed (as it should be). However, if the validator B completes successfully, it automatically marks job A as completed and success. The current private task (build_analysis_files) takes advantage of this behavior by creating the BIOM tables and submitting the validator jobs to the queue. Once those jobs are completed, they will set the correct output status to the current job. Does it make sense? | ||
| % traceback.format_exception(*exc_info())) | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -504,7 +504,8 @@ def _complete_artifact_definition(self, artifact_data): | |
| else: | ||
| # The artifact is uploaded by the user or is the initial | ||
| # artifact of an analysis | ||
| if job_params['analysis'] is not None: | ||
| if ('analysis' in job_params and | ||
| job_params['analysis'] is not None): | ||
| pt = None | ||
| an = qdb.analysis.Analysis(job_params['analysis']) | ||
| sql = """SELECT data_type | ||
|  | @@ -567,11 +568,21 @@ def _complete_artifact_transformation(self, artifacts_data): | |
| templates = set() | ||
| for artifact in self.input_artifacts: | ||
| templates.update(pt.id for pt in artifact.prep_templates) | ||
| template = None | ||
| analysis = None | ||
| if len(templates) > 1: | ||
| raise qdb.exceptions.QiitaDBError( | ||
| "Currently only single prep template " | ||
| "is allowed, found %d" % len(templates)) | ||
| template = templates.pop() | ||
| elif len(templates) == 1: | ||
| template = templates.pop() | ||
| else: | ||
| # In this case we have 0 templates. What this means is that | ||
| # this artifact is being generated in the analysis pipeline | ||
| # All the artifacts included in the analysis pipeline | ||
| # belong to the same analysis, so we can just ask the | ||
| # first artifact for the analysis that it belongs to | ||
| analysis = self.input_artifacts[0].analysis.id | ||
|  | ||
| # Once the validate job completes, it needs to know if it has | ||
| # been generated from a command (and how) or if it has been | ||
|  | @@ -592,6 +603,7 @@ def _complete_artifact_transformation(self, artifacts_data): | |
| cmd, values_dict={'files': dumps(filepaths), | ||
| 'artifact_type': atype, | ||
| 'template': template, | ||
| 'analysis': analysis, | ||
| 'provenance': dumps(provenance)}) | ||
| validator_jobs.append( | ||
| ProcessingJob.create(self.user, validate_params)) | ||
|  | @@ -1196,7 +1208,7 @@ def _raise_if_not_in_construction(self): | |
| WHERE processing_job_workflow_id = %s""" | ||
| qdb.sql_connection.TRN.add(sql, [self.id]) | ||
| res = qdb.sql_connection.TRN.execute_fetchflatten() | ||
| if len(res) != 1 or res[0] != 'in_construction': | ||
| if (len(res) == 1 and res[0] != 'in_construction') or len(res) > 1: | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add some documentation of each test? I'm not following why/when len(res) > 1 ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a comment | ||
| # The workflow is no longer in construction, raise an error | ||
| raise qdb.exceptions.QiitaDBOperationNotPermittedError( | ||
| "Workflow not in construction") | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -44,7 +44,8 @@ class Command(qdb.base.QiitaObject): | |
| _table = "software_command" | ||
|  | ||
| @classmethod | ||
| def get_commands_by_input_type(cls, artifact_types, active_only=True): | ||
| def get_commands_by_input_type(cls, artifact_types, active_only=True, | ||
| exclude_analysis=True): | ||
| """Returns the commands that can process the given artifact types | ||
|  | ||
| Parameters | ||
|  | @@ -70,6 +71,8 @@ def get_commands_by_input_type(cls, artifact_types, active_only=True): | |
| WHERE artifact_type IN %s""" | ||
| if active_only: | ||
| sql += " AND active = True" | ||
| if exclude_analysis: | ||
| sql += " AND is_analysis = False" | ||
| qdb.sql_connection.TRN.add(sql, [tuple(artifact_types)]) | ||
| for c_id in qdb.sql_connection.TRN.execute_fetchflatten(): | ||
| yield cls(c_id) | ||
|  | @@ -191,7 +194,8 @@ def exists(cls, software, name): | |
| return qdb.sql_connection.TRN.execute_fetchlast() | ||
|  | ||
| @classmethod | ||
| def create(cls, software, name, description, parameters, outputs=None): | ||
| def create(cls, software, name, description, parameters, outputs=None, | ||
| analysis_only=False): | ||
| r"""Creates a new command in the system | ||
|  | ||
| The supported types for the parameters are: | ||
|  | @@ -222,6 +226,9 @@ def create(cls, software, name, description, parameters, outputs=None): | |
| outputs : dict, optional | ||
| The description of the outputs that this command generated. The | ||
| format is: {output_name: artifact_type} | ||
| analysis_only : bool, optional | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does false mean the command is in the analysis pipeline and whatever other pipelines exist? Is it important to be able to restrict commands to non-analysis pipelines? Or, rather, is a two-state variable sufficient to describe the different scenarios faced? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We currently have two pipelines using the artifacts structure: the study pipeline and the (meta-)analysis pipeline. In my opinion, commands in the study pipeline should be available in the meta-analysis pipeline (my example command was open-ref, although we are no longer adding it to the system), but we do want to restrict the analysis pipeline commands on the study pipeline. Given the current status of the project and what we have outlined so far for the future of Qiita I don't foresee adding any other pipeline to the system that will use the artifacts structure as it currently stands. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the logical difference between a study command and an analysis command? | ||
| If true, then the command will only be available on the analysis | ||
| pipeline. Default: False. | ||
|  | ||
| Returns | ||
| ------- | ||
|  | @@ -297,10 +304,10 @@ def create(cls, software, name, description, parameters, outputs=None): | |
| % (software.id, name)) | ||
| # Add the command to the DB | ||
| sql = """INSERT INTO qiita.software_command | ||
| (name, software_id, description) | ||
| VALUES (%s, %s, %s) | ||
| (name, software_id, description, is_analysis) | ||
| VALUES (%s, %s, %s, %s) | ||
| RETURNING command_id""" | ||
| sql_params = [name, software.id, description] | ||
| sql_params = [name, software.id, description, analysis_only] | ||
| qdb.sql_connection.TRN.add(sql, sql_params) | ||
| c_id = qdb.sql_connection.TRN.execute_fetchlast() | ||
|  | ||
|  | @@ -508,6 +515,22 @@ def activate(self): | |
| qdb.sql_connection.TRN.add(sql, [True, self.id]) | ||
| return qdb.sql_connection.TRN.execute() | ||
|  | ||
| @property | ||
| def analysis_only(self): | ||
| """Returns if the command is an analysis-only command | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can a command be both an analysis command and "other"? I guess I don't understand what is under the umbrella of "analysis" and what is not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are currently two options: I don't think the option "Only on study processing pipeline" should be added as this can potentially limit the opportunities to run meta-analysis, see my previous comment. | ||
|  | ||
| Returns | ||
| ------- | ||
| bool | ||
| Whether the commad is analysis only or not | ||
|          | ||
| """ | ||
| with qdb.sql_connection.TRN: | ||
| sql = """SELECT is_analysis | ||
| FROM qiita.software_command | ||
| WHERE command_id = %s""" | ||
| qdb.sql_connection.TRN.add(sql, [self.id]) | ||
| return qdb.sql_connection.TRN.execute_fetchlast() | ||
|  | ||
|  | ||
| class Software(qdb.base.QiitaObject): | ||
| r"""A software package available in the system | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking out loud (not sure if that applies to written stuff, anyway ...) but should we add the sleep directly to the j.submit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would disagree because the sleep is only needed if submitting multiple jobs, if you're submitting a single one it is not needed, so no reason to stop the processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why sleep?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some cases we found that if we programmatically submit multiple jobs to the torque queue, the resource queue are not updated accordingly and the jobs end up not being spawned fairly across the cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for the delays historically was to spread load on the resource manager and scheduler as they are singleton resources within a shared environment. Generally speaking, the delays were only important for large volumes of submissions (e.g., > 100). I don't think we're doing that here. If there are issues where the scheduler or resource manager are not handling a small number of jobs appropriately, then it suggests either the jobs themselves are flawed or there is a critical issue with the servers handling the requests. Since we routinely hammer these servers outside of Qiita without delays, it suggests the former is the more likely scenario.
If you're not comfortable removing the delay, then at least set it to <= 100ms as 1s is a lifetime.
I wasn't aware Qiita used a library that links to the Torque C API? Aren't these programatic requests actually system calls?