Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 96 additions & 87 deletions qiita_db/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from __future__ import division
from collections import defaultdict
from itertools import product
from itertools import product, chain
from os.path import join

from future.utils import viewitems
from biom import load_table
from biom.util import biom_open
import pandas as pd
from skbio.util import find_duplicates

from qiita_core.exceptions import IncompetentQiitaDeveloperError
from qiita_core.qiita_settings import qiita_config
Expand Down Expand Up @@ -363,16 +361,13 @@ def samples(self):
Format is {artifact_id: [sample_id, sample_id, ...]}
"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_id, sample_id
sql = """SELECT artifact_id, array_agg(
sample_id ORDER BY sample_id)
FROM qiita.analysis_sample
WHERE analysis_id = %s
ORDER BY artifact_id"""
ret_samples = defaultdict(list)
GROUP BY artifact_id"""
qdb.sql_connection.TRN.add(sql, [self._id])
# turn into dict of samples keyed to artifact
for pid, sample in qdb.sql_connection.TRN.execute_fetchindex():
ret_samples[pid].append(sample)
return ret_samples
return dict(qdb.sql_connection.TRN.execute_fetchindex())

@property
def dropped_samples(self):
Expand Down Expand Up @@ -773,14 +768,20 @@ def remove_samples(self, artifacts=None, samples=None):
qdb.sql_connection.TRN.add(sql, args, many=True)
qdb.sql_connection.TRN.execute()

def build_files(self, rarefaction_depth=None):
def build_files(self,
rarefaction_depth=None,
merge_duplicated_sample_ids=False):
"""Builds biom and mapping files needed for analysis

Parameters
----------
rarefaction_depth : int, optional
Defaults to ``None``. If ``None``, do not rarefy. Otherwise, rarefy
all samples to this number of observations
merge_duplicated_sample_ids : bool, optional
If the duplicated sample ids in the selected studies should be
merged or prepended with the artifact ids. False (default) prepends
the artifact id

Raises
------
Expand All @@ -802,59 +803,70 @@ def build_files(self, rarefaction_depth=None):
raise ValueError(
"rarefaction_depth must be greater than 0")

samples = self._get_samples()
self._build_mapping_file(samples)
self._build_biom_tables(samples, rarefaction_depth)

def _get_samples(self):
"""Retrieves dict of {artifact_id: [sample_ids]}"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_id, array_agg(
sample_id ORDER BY sample_id)
FROM qiita.analysis_sample
WHERE analysis_id = %s
GROUP BY artifact_id"""
qdb.sql_connection.TRN.add(sql, [self._id])
return dict(qdb.sql_connection.TRN.execute_fetchindex())

def _build_biom_tables(self, samples, rarefaction_depth):
# in practice we could retrieve samples in each of the following
# calls but this will mean calling the DB multiple times and will
# make testing much harder as we will need to have analyses at
# different stages and possible errors.
samples = self.samples
# figuring out if we are going to have duplicated samples, again
# doing it here cause it's computational cheaper
all_ids = list(chain.from_iterable([
samps for _, samps in viewitems(samples)]))
rename_dup_samples = ((len(all_ids) != len(set(all_ids))) or
merge_duplicated_sample_ids)

self._build_mapping_file(samples, rename_dup_samples)
self._build_biom_tables(samples, rarefaction_depth,
rename_dup_samples)

def _build_biom_tables(self, samples, rarefaction_depth=None,
rename_dup_samples=False):
"""Build tables and add them to the analysis"""
with qdb.sql_connection.TRN:
# filter and combine all study BIOM tables needed for
# each data type
new_tables = {dt: None for dt in self.data_types}
base_fp = qdb.util.get_work_base_dir()
for a_id, samps in viewitems(samples):
# one biom table attached to each artifact object
artifact = qdb.artifact.Artifact(a_id)
table_fp = None

# this assumes that there is only one reference/pipeline for each
# data_type issue #164
new_tables = {dt: None for dt in self.data_types}
for aid, samps in viewitems(samples):
artifact = qdb.artifact.Artifact(aid)
# this is not checking the reference used for picking
# issue #164
biom_table_fp = None
for _, fp, fp_type in artifact.filepaths:
if fp_type == 'biom':
table_fp = fp
biom_table_fp = fp
break
if not table_fp:
if not biom_table_fp:
raise RuntimeError(
"Artifact %s do not have a biom table associated"
% a_id)
table = load_table(table_fp)
# HACKY WORKAROUND FOR DEMO. Issue # 246
# make sure samples not in biom table are not filtered for
table_samps = set(table.ids())
filter_samps = table_samps.intersection(samps)
# add the metadata column for study the samples come from
study_meta = {'Study': artifact.study.title,
'Processed_id': artifact.id}
samples_meta = {sid: study_meta for sid in filter_samps}
# filter for just the wanted samples and merge into new table
# this if/else setup avoids needing a blank table to
# start merges
table.filter(filter_samps, axis='sample', inplace=True)
table.add_metadata(samples_meta, axis='sample')
% aid)
biom_table = load_table(biom_table_fp)
# filtering samples to keep those selected by the user
biom_table_samples = set(biom_table.ids())
selected_samples = biom_table_samples.intersection(samps)
biom_table.filter(selected_samples, axis='sample',
inplace=True)

if rename_dup_samples:
ids_map = {_id: "%d.%s" % (aid, _id)
for _id in biom_table.ids()}
biom_table.update_ids(ids_map, 'sample', True, True)

# add the metadata column for study the samples come from,
# this is useful in case the user download the bioms
study_md = {'Study': artifact.study.title, 'Artifact_id': aid}
samples_md = {sid: study_md for sid in selected_samples}
biom_table.add_metadata(samples_md, axis='sample')
data_type = artifact.data_type

# this is not checking the reference used for picking
# issue #164
if new_tables[data_type] is None:
new_tables[data_type] = table
new_tables[data_type] = biom_table
else:
new_tables[data_type] = new_tables[data_type].merge(table)
new_tables[data_type] = \
new_tables[data_type].merge(biom_table)

# add the new tables to the analysis
_, base_fp = qdb.util.get_mountpoint(self._table)[0]
Expand All @@ -870,60 +882,57 @@ def _build_biom_tables(self, samples, rarefaction_depth):
self._add_file("%d_analysis_%s.biom" % (self._id, dt),
"biom", data_type=dt)

def _build_mapping_file(self, samples):
def _build_mapping_file(self, samples, rename_dup_samples=False):
"""Builds the combined mapping file for all samples
Code modified slightly from qiime.util.MetadataMap.__add__"""
with qdb.sql_connection.TRN:
all_sample_ids = set()
sql = """SELECT filepath_id, filepath
# query to get the latest qiime mapping file
sql = """SELECT filepath
FROM qiita.filepath
JOIN qiita.prep_template_filepath USING (filepath_id)
JOIN qiita.prep_template USING (prep_template_id)
JOIN qiita.filepath_type USING (filepath_type_id)
WHERE filepath_type = 'qiime_map'
AND artifact_id IN (SELECT *
FROM qiita.find_artifact_roots(%s))
ORDER BY filepath_id DESC"""
_id, fp = qdb.util.get_mountpoint('templates')[0]
ORDER BY filepath_id DESC LIMIT 1"""
_, fp = qdb.util.get_mountpoint('templates')[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this is the same that fp = qdb.util.get_mountpoint('templates')[0][0]. Probably adding a comment saying that we are only interested in the filepath but not in the id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's clear from the code as we are discarding _. Not sure this is needed. Do you feel strong about it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope


all_ids = set()
to_concat = []
for aid, samps in viewitems(samples):
qdb.sql_connection.TRN.add(sql, [aid])
qm_fp = qdb.sql_connection.TRN.execute_fetchindex()[0][0]

for pid, samples in viewitems(samples):
if len(samples) != len(set(samples)):
duplicates = find_duplicates(samples)
raise qdb.exceptions.QiitaDBError(
"Duplicate sample ids found: %s"
% ', '.join(duplicates))
# Get the QIIME mapping file
qdb.sql_connection.TRN.add(sql, [pid])
qiime_map_fp = \
qdb.sql_connection.TRN.execute_fetchindex()[0][1]
# Parse the mapping file
qiime_map = pd.read_csv(
join(fp, qiime_map_fp), sep='\t', keep_default_na=False,
na_values=['unknown'], index_col=False,
converters=defaultdict(lambda: str))
qiime_map.set_index('#SampleID', inplace=True, drop=True)
qiime_map = qiime_map.loc[samples]

duplicates = all_sample_ids.intersection(qiime_map.index)
if duplicates or len(samples) != len(set(samples)):
# Duplicate samples so raise error
raise qdb.exceptions.QiitaDBError(
"Duplicate sample ids found: %s"
% ', '.join(duplicates))
all_sample_ids.update(qiime_map.index)
to_concat.append(qiime_map)
qm = qdb.metadata_template.util.load_template_to_dataframe(
join(fp, qm_fp), index='#SampleID')

# if we are not going to merge the duplicated samples
# append the aid to the sample name
if rename_dup_samples:
qm['original_SampleID'] = qm.index
qm['#SampleID'] = "%d." % aid + qm.index
qm['qiita_aid'] = aid
samps = ['%d.%s' % (aid, _id) for _id in samps]
qm.set_index('#SampleID', inplace=True, drop=True)
else:
samps = set(samps) - all_ids
all_ids.update(samps)

qm = qm.loc[samps]
to_concat.append(qm)

merged_map = pd.concat(to_concat)

# forcing QIIME column order
cols = merged_map.columns.values.tolist()
cols.remove('BarcodeSequence')
cols.remove('LinkerPrimerSequence')
cols.remove('Description')
new_cols = ['BarcodeSequence', 'LinkerPrimerSequence']
new_cols.extend(cols)
new_cols.append('Description')
merged_map = merged_map[new_cols]
cols = (['BarcodeSequence', 'LinkerPrimerSequence'] + cols +
['Description'])
merged_map = merged_map[cols]

# Save the mapping file
_, base_fp = qdb.util.get_mountpoint(self._table)[0]
Expand Down
8 changes: 5 additions & 3 deletions qiita_db/support_files/populate_test_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,15 @@ INSERT INTO qiita.artifact (name, generated_timestamp, command_id, command_param
3, 6, 2, TRUE, TRUE),
('Demultiplexed 2', 'Mon Oct 1 11:30:27 2012', 1, '{"max_bad_run_length":3,"min_per_read_length_fraction":0.75,"sequence_max_n":0,"rev_comp_barcode":false,"rev_comp_mapping_barcodes":true,"rev_comp":false,"phred_quality_threshold":3,"barcode_type":"golay_12","max_barcode_errors":1.5,"input_data":1}'::json,
3, 6, 2, TRUE, TRUE),
('BIOM', 'Tue Oct 2 17:30:00 2012', 3, '{"reference":1,"sortmerna_e_value":1,"sortmerna_max_pos":10000,"similarity":0.97,"sortmerna_coverage":0.97,"threads":1,"input_data":2}'::json,
3, 7, 2, FALSE, FALSE),
('BIOM', 'Tue Oct 2 17:30:00 2012', 3, '{"reference":1,"sortmerna_e_value":1,"sortmerna_max_pos":10000,"similarity":0.97,"sortmerna_coverage":0.97,"threads":1,"input_data":2}'::json,
3, 7, 2, FALSE, FALSE);

-- Link the child artifacts with their parents artifacts
INSERT INTO qiita.parent_artifact (parent_id, artifact_id)
VALUES (1, 2), (1, 3),
(2, 4);
(2, 4), (2, 5);

-- Insert the jobs that processed the previous artifacts
INSERT INTO qiita.processing_job (processing_job_id, email, command_id, command_parameters, processing_job_status_id)
Expand Down Expand Up @@ -362,14 +364,14 @@ INSERT INTO qiita.filepath (filepath, filepath_type_id, checksum, checksum_algor
INSERT INTO qiita.artifact_filepath (artifact_id, filepath_id)
VALUES (1, 1), (1, 2),
(2, 3), (2, 4), (2, 5),
(4, 9);
(4, 9), (5, 9);

-- Link the artifact with the prep template
UPDATE qiita.prep_template SET artifact_id = 1 WHERE prep_template_id = 1;

-- Link the study with the artifacts
INSERT INTO qiita.study_artifact (study_id, artifact_id)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);
VALUES (1, 1), (1, 2), (1, 3), (1, 4), (1, 5);

-- Insert EBI information for artifact 2
INSERT INTO qiita.ebi_run_accession (sample_id, artifact_id, ebi_run_accession)
Expand Down
Loading