Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 25 additions & 62 deletions qiita_ware/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------

from os import makedirs
from os.path import join, isdir
from os import makedirs, remove
from shutil import rmtree
from functools import partial
from tempfile import mkdtemp
from gzip import open as gzopen
from tarfile import open as taropen
from tempfile import mkdtemp
from moi.job import system_call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add blank line before this moi import


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this blank line

from qiita_db.study import Study
Expand All @@ -20,9 +20,7 @@
from qiita_core.qiita_settings import qiita_config

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this blank line

from qiita_ware.ebi import EBISubmission
from qiita_ware.demux import to_per_sample_ascii
from qiita_ware.exceptions import ComputeError
from qiita_ware.util import open_file


def submit_EBI(preprocessed_data_id, action, send, fastq_dir_fp=None):
Expand All @@ -44,53 +42,25 @@ def submit_EBI(preprocessed_data_id, action, send, fastq_dir_fp=None):
If fastq_dir_fp is passed, it must not contain any empty files, or
gzipped empty files
"""

preprocessed_data = PreprocessedData(preprocessed_data_id)
preprocessed_data_id_str = str(preprocessed_data_id)
study = Study(preprocessed_data.study)
sample_template = SampleTemplate(study.sample_template)
prep_template = PrepTemplate(preprocessed_data.prep_template)

investigation_type = None
new_investigation_type = None

if send:
# If we intend actually to send the files, then change the status in
# the database
preprocessed_data.update_insdc_status('submitting')

if fastq_dir_fp is not None:
# If the user specifies a FASTQ directory, use it

# Set demux_samples to None so that MetadataTemplate.to_file will put
# all samples in the template files
demux_samples = None
else:
# If the user does not specify a FASTQ directory, create one and
# re-serialize the per-sample FASTQs from the demux file
fastq_dir_fp = mkdtemp(prefix=qiita_config.working_dir)
demux = [path for _, path, ftype in preprocessed_data.get_filepaths()
if ftype == 'preprocessed_demux'][0]

# Keep track of which files were actually in the demux file so that we
# can write those rows to the prep and samples templates
demux_samples = set()

with open_file(demux) as demux_fh:
for samp, iterator in to_per_sample_ascii(demux_fh,
list(sample_template)):
demux_samples.add(samp)
sample_fp = join(fastq_dir_fp, "%s.fastq.gz" % samp)
wrote_sequences = False
with gzopen(sample_fp, 'w') as fh:
for record in iterator:
fh.write(record)
wrote_sequences = True

if not wrote_sequences:
remove(sample_fp)

# step 1
ebi_submission = EBISubmission(preprocessed_data_id, action)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want to change the signature on the EBISubmission constructor. What needs to be supplied for an ADD action versus an UPDATE action is quite different (and an UPDATE cannot be performed using only information in a prep template)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on comment about requirements for UPDATE. However, I think sending the preprocessed_data_id is the way to go, as from there we know which submission is being updated and retrieve all study and sample information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes sense. During the submission process, we should add to the database the EBI aliases for everything that is submitted, so that in the case of an update, we can retrieve them easily.


# step 2
ebi_submission.preprocessed_data.update_insdc_status('demuxing samples')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this even work? I though that the insdc_status was a controlled vocab, except for the failed: ... status.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but at this point I'm just setting this section as it should work and then cleaning as needed. Note that the issue is that there is no tests for this file ... I'm gonna need to add it in a future iteration. Already added it to the joy list: #1451

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'm unsure if this function even needs to exist... We probably only want to use the parallel wrapper directly and this function should disappear. However, we will see as the code evolves...

try:
demux_samples = ebi_submission.generate_demultiplexed_fastq()
except Exception as e:
if isdir(ebi_submission.ebi_dir):
rmtree(ebi_submission.ebi_dir)
ebi_submission.preprocessed_data.update_insdc_status(
'failed: %s' % str(e))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what are other's thought, but I think the full traceback should be logged in the Log table. Will specially be useful for debugging obscure problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same thing. 👍

raise

# other steps
output_dir = fastq_dir_fp + '_submission'
sample_template = ebi_submission.sample_template
prep_template = ebi_submission.prep_template
preprocessed_data = ebi_submission.preprocessed_data

samp_fp = join(fastq_dir_fp, 'sample_metadata.txt')
prep_fp = join(fastq_dir_fp, 'prep_metadata.txt')
Expand All @@ -112,19 +82,12 @@ def submit_EBI(preprocessed_data_id, action, send, fastq_dir_fp=None):
raise IOError('The output folder already exists: %s' %
output_dir)

with open(samp_fp, 'U') as st, open(prep_fp, 'U') as pt:
submission = EBISubmission.from_templates_and_per_sample_fastqs(
preprocessed_data_id_str, study.title,
study.info['study_abstract'], investigation_type, st, pt,
fastq_dir_fp, new_investigation_type=new_investigation_type,
pmids=study.pmids)

submission.write_all_xml_files(study_fp, sample_fp, experiment_fp, run_fp,
submission_fp, action)
ebi_submission.write_all_xml_files(study_fp, sample_fp, experiment_fp,
run_fp, submission_fp, action)

if send:
submission.send_sequences()
study_accession, submission_accession = submission.send_xml()
ebi_submission.send_sequences()
study_accession, submission_accession = ebi_submission.send_xml()

if study_accession is None or submission_accession is None:
preprocessed_data.update_insdc_status('failed')
Expand Down
122 changes: 70 additions & 52 deletions qiita_ware/ebi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,27 @@
from subprocess import call
from shlex import split as shsplit
from glob import glob
from os.path import basename, exists, join, split
from os import environ, close
from datetime import date, timedelta, datetime
from os.path import basename, exists, join, split, isdir, isfile
from os import environ, close, makedirs, remove, listdir
from datetime import date, timedelta
from xml.etree import ElementTree as ET
from xml.dom import minidom
from xml.sax.saxutils import escape
from gzip import open as gzopen

from future.utils import viewitems
from skbio.util import safe_md5

from qiita_core.qiita_settings import qiita_config
from qiita_ware.exceptions import EBISumbissionError
from qiita_ware.demux import to_per_sample_ascii
from qiita_ware.util import open_file
from qiita_db.logger import LogEntry
from qiita_db.ontology import Ontology
from qiita_db.util import convert_to_id
from qiita_db.study import Study
from qiita_db.data import PreprocessedData
from qiita_db.metadata_template import PrepTemplate
from qiita_db.metadata_template import PrepTemplate, SampleTemplate


class InvalidMetadataError(Exception):
Expand Down Expand Up @@ -117,19 +120,20 @@ def __init__(self, preprocessed_data_id, action):
', '.join(valid_ebi_actions), action))

ena_ontology = Ontology(convert_to_id('ENA', 'ontology'))
ppd = PreprocessedData(preprocessed_data_id)
s = Study(ppd.study)
pt = PrepTemplate(ppd.prep_template)
self.preprocessed_data = PreprocessedData(preprocessed_data_id)
s = Study(self.preprocessed_data.study)
self.sample_template = SampleTemplate(s.sample_template)
self.prep_template = PrepTemplate(self.preprocessed_data.prep_template)

status = ppd.submitted_to_insdc_status()
status = self.preprocessed_data.submitted_to_insdc_status()
if status in valid_ebi_submission_states:
raise ValueError("Cannot resubmit! Current status is: %s" % status)

self.preprocessed_data_id = preprocessed_data_id
self.study_title = s.title
self.study_abstract = s.info['study_abstract']

it = pt.investigation_type
it = self.prep_template.investigation_type
key = it
if it in ena_ontology.terms:
self.investigation_type = it
Expand All @@ -143,8 +147,9 @@ def __init__(self, preprocessed_data_id, action):
"term is neither one of the official terms nor "
"one of the user-defined terms in the ENA "
"ontology")
ts = datetime.now().strftime('%Y_%m_%d_%H:%M:%S')
self.ebi_dir = 'ebi_submission_%s_%s' % (preprocessed_data_id, ts)

self.ebi_dir = '%s/ebi_submission_%s' % (qiita_config.working_dir,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferably, this should be using path.join instead of string formatting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

preprocessed_data_id)
self.sequence_files = []
self.study_xml_fp = None
self.sample_xml_fp = None
Expand All @@ -166,20 +171,6 @@ def __init__(self, preprocessed_data_id, action):
# This will hold the submission's samples, keyed by the sample name
self.samples = {}

def _stringify_kwargs(self, kwargs_dict):
"""Turns values in a dictionay into strings, None, or self.empty_value
"""
try:
result = {
str(k): str(v) if v is not None else self.empty_value
for k, v in viewitems(kwargs_dict)}
return result
except ValueError:
raise InvalidMetadataError("All additional metadata passed via "
"kwargs to the EBISubmission "
"constructor must be representatable "
"as strings.")

def _get_study_alias(self):
"""Format alias using ``self.preprocessed_data_id``"""
study_alias_format = '%s_ppdid_%s'
Expand Down Expand Up @@ -286,7 +277,7 @@ def generate_study_xml(self):
return study_set

def add_sample(self, sample_name, taxon_id, scientific_name,
description, **kwargs):
description):
"""Adds sample information to the current submission

Parameters
Expand All @@ -298,7 +289,6 @@ def add_sample(self, sample_name, taxon_id, scientific_name,
scientific_name : str
NCBI's scientific name for the `taxon_id`
description : str

Defaults to ``None``. If not provided, the `empty_value` will be
used for the description

Expand All @@ -324,11 +314,6 @@ def add_sample(self, sample_name, taxon_id, scientific_name,
self.samples[sample_name]['description'] = escape(
clean_whitespace(description))

self.samples[sample_name]['attributes'] = self._stringify_kwargs(
kwargs)

self.samples[sample_name]['prep'] = None

def generate_sample_xml(self):
"""Generates the sample XML file

Expand Down Expand Up @@ -363,18 +348,11 @@ def generate_sample_xml(self):
description.text = escape(clean_whitespace(
sample_info['description']))

if sample_info['attributes']:
sample_attributes = ET.SubElement(sample, 'SAMPLE_ATTRIBUTES')
self._add_dict_as_tags_and_values(sample_attributes,
'SAMPLE_ATTRIBUTE',
sample_info['attributes'])

return sample_set

def add_sample_prep(self, sample_name, platform, file_type, file_path,
experiment_design_description,
library_construction_protocol,
**kwargs):
library_construction_protocol):
"""Add prep info for an existing sample

Parameters
Expand All @@ -395,29 +373,21 @@ def add_sample_prep(self, sample_name, platform, file_type, file_path,
KeyError
If `sample_name` is not in the list of samples in the
``EBISubmission`` object
KeyError
If there is already prep info associated with the specified sample
ValueError
If the platform is not one of the recognized platforms
"""
if sample_name not in self.samples:
raise KeyError("Sample %s: sample has not yet been associated "
"with the submission.")

if self.samples[sample_name]['prep']:
raise KeyError("Sample %s: multiple rows in prep with this sample "
"id!" % sample_name)

platforms = ['LS454', 'ILLUMINA', 'UNKNOWN']
if platform.upper() not in platforms:
raise ValueError("The platform name %s is invalid, must be one of "
"%s (case insensitive)" % (platform,
', '.join(platforms)))

self.sequence_files.append(file_path)
prep_info = self._stringify_kwargs(kwargs)
if prep_info is None:
prep_info = {}
prep_info = {}
prep_info['platform'] = platform
prep_info['file_type'] = file_type
prep_info['file_path'] = file_path
Expand Down Expand Up @@ -825,7 +795,7 @@ def add_samples_from_templates(self, sample_template, prep_template,
scientific_name, description))

self.add_sample(sample_name, taxon_id, scientific_name,
description, **sample)
description)

prep_template_samples = []
for prep in iter_file_via_list_of_dicts(prep_template):
Expand All @@ -840,8 +810,7 @@ def add_samples_from_templates(self, sample_template, prep_template,
file_path = join(per_sample_fastq_dir, sample_name+'.fastq.gz')
self.add_sample_prep(sample_name, platform, 'fastq',
file_path, experiment_design_description,
library_construction_protocol,
**prep)
library_construction_protocol)

to_remove = set(self.samples).difference(prep_template_samples)
for sample in to_remove:
Expand Down Expand Up @@ -981,3 +950,52 @@ def send_xml(self):
print "FAILED"

return (study_accession, submission_accession)

def generate_demultiplexed_fastq(self):
"""Generates demultiplexed fastq

Returns
-------
demux_samples
List of successful demultiplexed samples

Notes
-----
- As a performace feature, this method will check if self.ebi_dir
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a fine performance feature, but what would be a way to bypass this? i.e. a case where you know that there's something wrong with the generate per-sample FASTQ files and (luckily) your submission failed before? Maybe I am missing something but it seems like the only solution would be to delete the files manually?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fails the code will remove the folder so I really do not see how that can happen. Anyway, if you feel strongly about this, I can put a parameter 'regenerate' by default FALSE ...

already exist and, if it does, the script will assume that in a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exist -> exists

previous execution this step was performed correctly and will simply
read the file names from self.ebi_dir
"""
ppd = self.preprocessed_data

if not isdir(self.ebi_dir):
makedirs(self.ebi_dir)

demux = [path for _, path, ftype in ppd.get_filepaths()
if ftype == 'preprocessed_demux'][0]

demux_samples = set()
with open_file(demux) as demux_fh:
for s, i in to_per_sample_ascii(demux_fh,
self.sample_template.keys()):
sample_fp = join(self.ebi_dir, "%s.fastq.gz" % s)
wrote_sequences = False
with gzopen(sample_fp, 'w') as fh:
for record in i:
fh.write(record)
wrote_sequences = True

if wrote_sequences:
demux_samples.add(s)
else:
remove(sample_fp)

else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure that I understand what is going on here. Is this else covering the case in which the per sample FASTQ's have been already generated? If so, can you add a comment to make it clear?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done as a Note.

demux_samples = set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this whole block could be simplified using glob.glob.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear what I mean is that you could do for f in glob.glob(path.join(self.ebi_dir, '*.fastq.gz')): .....

extension = '.fastq.gz'
extension_len = len(extension)
for f in listdir(self.ebi_dir):
if isfile(join(self.ebi_dir, f)) and f.endswith(extension):
demux_samples.add(f[:-extension_len])

return demux_samples
Loading