-
Couldn't load subscription status.
- Fork 79
EBI step 2 #1450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EBI step 2 #1450
Changes from 3 commits
e348327
e079517
efdb741
7112d08
58d0a7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,12 +6,12 @@ | |
| # The full license is in the file LICENSE, distributed with this software. | ||
| # ----------------------------------------------------------------------------- | ||
|
|
||
| from os import makedirs | ||
| from os.path import join, isdir | ||
| from os import makedirs, remove | ||
| from shutil import rmtree | ||
| from functools import partial | ||
| from tempfile import mkdtemp | ||
| from gzip import open as gzopen | ||
| from tarfile import open as taropen | ||
| from tempfile import mkdtemp | ||
| from moi.job import system_call | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. delete this blank line |
||
| from qiita_db.study import Study | ||
|
|
@@ -20,9 +20,7 @@ | |
| from qiita_core.qiita_settings import qiita_config | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. delete this blank line |
||
| from qiita_ware.ebi import EBISubmission | ||
| from qiita_ware.demux import to_per_sample_ascii | ||
| from qiita_ware.exceptions import ComputeError | ||
| from qiita_ware.util import open_file | ||
|
|
||
|
|
||
| def submit_EBI(preprocessed_data_id, action, send, fastq_dir_fp=None): | ||
|
|
@@ -44,53 +42,25 @@ def submit_EBI(preprocessed_data_id, action, send, fastq_dir_fp=None): | |
| If fastq_dir_fp is passed, it must not contain any empty files, or | ||
| gzipped empty files | ||
| """ | ||
|
|
||
| preprocessed_data = PreprocessedData(preprocessed_data_id) | ||
| preprocessed_data_id_str = str(preprocessed_data_id) | ||
| study = Study(preprocessed_data.study) | ||
| sample_template = SampleTemplate(study.sample_template) | ||
| prep_template = PrepTemplate(preprocessed_data.prep_template) | ||
|
|
||
| investigation_type = None | ||
| new_investigation_type = None | ||
|
|
||
| if send: | ||
| # If we intend actually to send the files, then change the status in | ||
| # the database | ||
| preprocessed_data.update_insdc_status('submitting') | ||
|
|
||
| if fastq_dir_fp is not None: | ||
| # If the user specifies a FASTQ directory, use it | ||
|
|
||
| # Set demux_samples to None so that MetadataTemplate.to_file will put | ||
| # all samples in the template files | ||
| demux_samples = None | ||
| else: | ||
| # If the user does not specify a FASTQ directory, create one and | ||
| # re-serialize the per-sample FASTQs from the demux file | ||
| fastq_dir_fp = mkdtemp(prefix=qiita_config.working_dir) | ||
| demux = [path for _, path, ftype in preprocessed_data.get_filepaths() | ||
| if ftype == 'preprocessed_demux'][0] | ||
|
|
||
| # Keep track of which files were actually in the demux file so that we | ||
| # can write those rows to the prep and samples templates | ||
| demux_samples = set() | ||
|
|
||
| with open_file(demux) as demux_fh: | ||
| for samp, iterator in to_per_sample_ascii(demux_fh, | ||
| list(sample_template)): | ||
| demux_samples.add(samp) | ||
| sample_fp = join(fastq_dir_fp, "%s.fastq.gz" % samp) | ||
| wrote_sequences = False | ||
| with gzopen(sample_fp, 'w') as fh: | ||
| for record in iterator: | ||
| fh.write(record) | ||
| wrote_sequences = True | ||
|
|
||
| if not wrote_sequences: | ||
| remove(sample_fp) | ||
|
|
||
| # step 1 | ||
| ebi_submission = EBISubmission(preprocessed_data_id, action) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we might want to change the signature on the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree on comment about requirements for UPDATE. However, I think sending the preprocessed_data_id is the way to go, as from there we know which submission is being updated and retrieve all study and sample information. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, that makes sense. During the submission process, we should add to the database the EBI aliases for everything that is submitted, so that in the case of an update, we can retrieve them easily. |
||
|
|
||
| # step 2 | ||
| ebi_submission.preprocessed_data.update_insdc_status('demuxing samples') | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this even work? I though that the insdc_status was a controlled vocab, except for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure, but at this point I'm just setting this section as it should work and then cleaning as needed. Note that the issue is that there is no tests for this file ... I'm gonna need to add it in a future iteration. Already added it to the joy list: #1451 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I'm unsure if this function even needs to exist... We probably only want to use the parallel wrapper directly and this function should disappear. However, we will see as the code evolves... |
||
| try: | ||
| demux_samples = ebi_submission.generate_demultiplexed_fastq() | ||
| except Exception as e: | ||
| if isdir(ebi_submission.ebi_dir): | ||
| rmtree(ebi_submission.ebi_dir) | ||
| ebi_submission.preprocessed_data.update_insdc_status( | ||
| 'failed: %s' % str(e)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what are other's thought, but I think the full traceback should be logged in the Log table. Will specially be useful for debugging obscure problems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking the same thing. 👍 |
||
| raise | ||
|
|
||
| # other steps | ||
| output_dir = fastq_dir_fp + '_submission' | ||
| sample_template = ebi_submission.sample_template | ||
| prep_template = ebi_submission.prep_template | ||
| preprocessed_data = ebi_submission.preprocessed_data | ||
|
|
||
| samp_fp = join(fastq_dir_fp, 'sample_metadata.txt') | ||
| prep_fp = join(fastq_dir_fp, 'prep_metadata.txt') | ||
|
|
@@ -112,19 +82,12 @@ def submit_EBI(preprocessed_data_id, action, send, fastq_dir_fp=None): | |
| raise IOError('The output folder already exists: %s' % | ||
| output_dir) | ||
|
|
||
| with open(samp_fp, 'U') as st, open(prep_fp, 'U') as pt: | ||
| submission = EBISubmission.from_templates_and_per_sample_fastqs( | ||
| preprocessed_data_id_str, study.title, | ||
| study.info['study_abstract'], investigation_type, st, pt, | ||
| fastq_dir_fp, new_investigation_type=new_investigation_type, | ||
| pmids=study.pmids) | ||
|
|
||
| submission.write_all_xml_files(study_fp, sample_fp, experiment_fp, run_fp, | ||
| submission_fp, action) | ||
| ebi_submission.write_all_xml_files(study_fp, sample_fp, experiment_fp, | ||
| run_fp, submission_fp, action) | ||
|
|
||
| if send: | ||
| submission.send_sequences() | ||
| study_accession, submission_accession = submission.send_xml() | ||
| ebi_submission.send_sequences() | ||
| study_accession, submission_accession = ebi_submission.send_xml() | ||
|
|
||
| if study_accession is None or submission_accession is None: | ||
| preprocessed_data.update_insdc_status('failed') | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,24 +3,27 @@ | |
| from subprocess import call | ||
| from shlex import split as shsplit | ||
| from glob import glob | ||
| from os.path import basename, exists, join, split | ||
| from os import environ, close | ||
| from datetime import date, timedelta, datetime | ||
| from os.path import basename, exists, join, split, isdir, isfile | ||
| from os import environ, close, makedirs, remove, listdir | ||
| from datetime import date, timedelta | ||
| from xml.etree import ElementTree as ET | ||
| from xml.dom import minidom | ||
| from xml.sax.saxutils import escape | ||
| from gzip import open as gzopen | ||
|
|
||
| from future.utils import viewitems | ||
| from skbio.util import safe_md5 | ||
|
|
||
| from qiita_core.qiita_settings import qiita_config | ||
| from qiita_ware.exceptions import EBISumbissionError | ||
| from qiita_ware.demux import to_per_sample_ascii | ||
| from qiita_ware.util import open_file | ||
| from qiita_db.logger import LogEntry | ||
| from qiita_db.ontology import Ontology | ||
| from qiita_db.util import convert_to_id | ||
| from qiita_db.study import Study | ||
| from qiita_db.data import PreprocessedData | ||
| from qiita_db.metadata_template import PrepTemplate | ||
| from qiita_db.metadata_template import PrepTemplate, SampleTemplate | ||
|
|
||
|
|
||
| class InvalidMetadataError(Exception): | ||
|
|
@@ -117,19 +120,20 @@ def __init__(self, preprocessed_data_id, action): | |
| ', '.join(valid_ebi_actions), action)) | ||
|
|
||
| ena_ontology = Ontology(convert_to_id('ENA', 'ontology')) | ||
| ppd = PreprocessedData(preprocessed_data_id) | ||
| s = Study(ppd.study) | ||
| pt = PrepTemplate(ppd.prep_template) | ||
| self.preprocessed_data = PreprocessedData(preprocessed_data_id) | ||
| s = Study(self.preprocessed_data.study) | ||
| self.sample_template = SampleTemplate(s.sample_template) | ||
| self.prep_template = PrepTemplate(self.preprocessed_data.prep_template) | ||
|
|
||
| status = ppd.submitted_to_insdc_status() | ||
| status = self.preprocessed_data.submitted_to_insdc_status() | ||
| if status in valid_ebi_submission_states: | ||
| raise ValueError("Cannot resubmit! Current status is: %s" % status) | ||
|
|
||
| self.preprocessed_data_id = preprocessed_data_id | ||
| self.study_title = s.title | ||
| self.study_abstract = s.info['study_abstract'] | ||
|
|
||
| it = pt.investigation_type | ||
| it = self.prep_template.investigation_type | ||
| key = it | ||
| if it in ena_ontology.terms: | ||
| self.investigation_type = it | ||
|
|
@@ -143,8 +147,9 @@ def __init__(self, preprocessed_data_id, action): | |
| "term is neither one of the official terms nor " | ||
| "one of the user-defined terms in the ENA " | ||
| "ontology") | ||
| ts = datetime.now().strftime('%Y_%m_%d_%H:%M:%S') | ||
| self.ebi_dir = 'ebi_submission_%s_%s' % (preprocessed_data_id, ts) | ||
|
|
||
| self.ebi_dir = '%s/ebi_submission_%s' % (qiita_config.working_dir, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preferably, this should be using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| preprocessed_data_id) | ||
| self.sequence_files = [] | ||
| self.study_xml_fp = None | ||
| self.sample_xml_fp = None | ||
|
|
@@ -166,20 +171,6 @@ def __init__(self, preprocessed_data_id, action): | |
| # This will hold the submission's samples, keyed by the sample name | ||
| self.samples = {} | ||
|
|
||
| def _stringify_kwargs(self, kwargs_dict): | ||
| """Turns values in a dictionay into strings, None, or self.empty_value | ||
| """ | ||
| try: | ||
| result = { | ||
| str(k): str(v) if v is not None else self.empty_value | ||
| for k, v in viewitems(kwargs_dict)} | ||
| return result | ||
| except ValueError: | ||
| raise InvalidMetadataError("All additional metadata passed via " | ||
| "kwargs to the EBISubmission " | ||
| "constructor must be representatable " | ||
| "as strings.") | ||
|
|
||
| def _get_study_alias(self): | ||
| """Format alias using ``self.preprocessed_data_id``""" | ||
| study_alias_format = '%s_ppdid_%s' | ||
|
|
@@ -286,7 +277,7 @@ def generate_study_xml(self): | |
| return study_set | ||
|
|
||
| def add_sample(self, sample_name, taxon_id, scientific_name, | ||
| description, **kwargs): | ||
| description): | ||
| """Adds sample information to the current submission | ||
|
|
||
| Parameters | ||
|
|
@@ -298,7 +289,6 @@ def add_sample(self, sample_name, taxon_id, scientific_name, | |
| scientific_name : str | ||
| NCBI's scientific name for the `taxon_id` | ||
| description : str | ||
|
|
||
| Defaults to ``None``. If not provided, the `empty_value` will be | ||
| used for the description | ||
|
|
||
|
|
@@ -324,11 +314,6 @@ def add_sample(self, sample_name, taxon_id, scientific_name, | |
| self.samples[sample_name]['description'] = escape( | ||
| clean_whitespace(description)) | ||
|
|
||
| self.samples[sample_name]['attributes'] = self._stringify_kwargs( | ||
| kwargs) | ||
|
|
||
| self.samples[sample_name]['prep'] = None | ||
|
|
||
| def generate_sample_xml(self): | ||
| """Generates the sample XML file | ||
|
|
||
|
|
@@ -363,18 +348,11 @@ def generate_sample_xml(self): | |
| description.text = escape(clean_whitespace( | ||
| sample_info['description'])) | ||
|
|
||
| if sample_info['attributes']: | ||
| sample_attributes = ET.SubElement(sample, 'SAMPLE_ATTRIBUTES') | ||
| self._add_dict_as_tags_and_values(sample_attributes, | ||
| 'SAMPLE_ATTRIBUTE', | ||
| sample_info['attributes']) | ||
|
|
||
| return sample_set | ||
|
|
||
| def add_sample_prep(self, sample_name, platform, file_type, file_path, | ||
| experiment_design_description, | ||
| library_construction_protocol, | ||
| **kwargs): | ||
| library_construction_protocol): | ||
| """Add prep info for an existing sample | ||
|
|
||
| Parameters | ||
|
|
@@ -395,29 +373,21 @@ def add_sample_prep(self, sample_name, platform, file_type, file_path, | |
| KeyError | ||
| If `sample_name` is not in the list of samples in the | ||
| ``EBISubmission`` object | ||
| KeyError | ||
| If there is already prep info associated with the specified sample | ||
| ValueError | ||
| If the platform is not one of the recognized platforms | ||
| """ | ||
| if sample_name not in self.samples: | ||
| raise KeyError("Sample %s: sample has not yet been associated " | ||
| "with the submission.") | ||
|
|
||
| if self.samples[sample_name]['prep']: | ||
| raise KeyError("Sample %s: multiple rows in prep with this sample " | ||
| "id!" % sample_name) | ||
|
|
||
| platforms = ['LS454', 'ILLUMINA', 'UNKNOWN'] | ||
| if platform.upper() not in platforms: | ||
| raise ValueError("The platform name %s is invalid, must be one of " | ||
| "%s (case insensitive)" % (platform, | ||
| ', '.join(platforms))) | ||
|
|
||
| self.sequence_files.append(file_path) | ||
| prep_info = self._stringify_kwargs(kwargs) | ||
| if prep_info is None: | ||
| prep_info = {} | ||
| prep_info = {} | ||
| prep_info['platform'] = platform | ||
| prep_info['file_type'] = file_type | ||
| prep_info['file_path'] = file_path | ||
|
|
@@ -825,7 +795,7 @@ def add_samples_from_templates(self, sample_template, prep_template, | |
| scientific_name, description)) | ||
|
|
||
| self.add_sample(sample_name, taxon_id, scientific_name, | ||
| description, **sample) | ||
| description) | ||
|
|
||
| prep_template_samples = [] | ||
| for prep in iter_file_via_list_of_dicts(prep_template): | ||
|
|
@@ -840,8 +810,7 @@ def add_samples_from_templates(self, sample_template, prep_template, | |
| file_path = join(per_sample_fastq_dir, sample_name+'.fastq.gz') | ||
| self.add_sample_prep(sample_name, platform, 'fastq', | ||
| file_path, experiment_design_description, | ||
| library_construction_protocol, | ||
| **prep) | ||
| library_construction_protocol) | ||
|
|
||
| to_remove = set(self.samples).difference(prep_template_samples) | ||
| for sample in to_remove: | ||
|
|
@@ -981,3 +950,52 @@ def send_xml(self): | |
| print "FAILED" | ||
|
|
||
| return (study_accession, submission_accession) | ||
|
|
||
| def generate_demultiplexed_fastq(self): | ||
| """Generates demultiplexed fastq | ||
|
|
||
| Returns | ||
| ------- | ||
| demux_samples | ||
| List of successful demultiplexed samples | ||
|
|
||
| Notes | ||
| ----- | ||
| - As a performace feature, this method will check if self.ebi_dir | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like a fine performance feature, but what would be a way to bypass this? i.e. a case where you know that there's something wrong with the generate per-sample FASTQ files and (luckily) your submission failed before? Maybe I am missing something but it seems like the only solution would be to delete the files manually? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this fails the code will remove the folder so I really do not see how that can happen. Anyway, if you feel strongly about this, I can put a parameter 'regenerate' by default FALSE ... |
||
| already exist and, if it does, the script will assume that in a | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. exist -> exists |
||
| previous execution this step was performed correctly and will simply | ||
| read the file names from self.ebi_dir | ||
| """ | ||
| ppd = self.preprocessed_data | ||
|
|
||
| if not isdir(self.ebi_dir): | ||
| makedirs(self.ebi_dir) | ||
|
|
||
| demux = [path for _, path, ftype in ppd.get_filepaths() | ||
| if ftype == 'preprocessed_demux'][0] | ||
|
|
||
| demux_samples = set() | ||
| with open_file(demux) as demux_fh: | ||
| for s, i in to_per_sample_ascii(demux_fh, | ||
| self.sample_template.keys()): | ||
| sample_fp = join(self.ebi_dir, "%s.fastq.gz" % s) | ||
| wrote_sequences = False | ||
| with gzopen(sample_fp, 'w') as fh: | ||
| for record in i: | ||
| fh.write(record) | ||
| wrote_sequences = True | ||
|
|
||
| if wrote_sequences: | ||
| demux_samples.add(s) | ||
| else: | ||
| remove(sample_fp) | ||
|
|
||
| else: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to make sure that I understand what is going on here. Is this else covering the case in which the per sample FASTQ's have been already generated? If so, can you add a comment to make it clear? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done as a Note. |
||
| demux_samples = set() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this whole block could be simplified using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to be clear what I mean is that you could do |
||
| extension = '.fastq.gz' | ||
| extension_len = len(extension) | ||
| for f in listdir(self.ebi_dir): | ||
| if isfile(join(self.ebi_dir, f)) and f.endswith(extension): | ||
| demux_samples.add(f[:-extension_len]) | ||
|
|
||
| return demux_samples | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add blank line before this
moiimport