Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 95 additions & 84 deletions qiita_db/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from biom import load_table
from biom.util import biom_open
import pandas as pd
from skbio.util import find_duplicates
from skbio.util import flatten
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flatten has been removed in later versions of skbio. In other parts of the code, we have been using the construction below to use flatten:

from itertools import chain
list(chain.from_iterable(ITERABLE))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K


from qiita_core.exceptions import IncompetentQiitaDeveloperError
from qiita_core.qiita_settings import qiita_config
Expand Down Expand Up @@ -363,16 +363,13 @@ def samples(self):
Format is {artifact_id: [sample_id, sample_id, ...]}
"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_id, sample_id
sql = """SELECT artifact_id, array_agg(
sample_id ORDER BY sample_id)
FROM qiita.analysis_sample
WHERE analysis_id = %s
ORDER BY artifact_id"""
ret_samples = defaultdict(list)
GROUP BY artifact_id"""
qdb.sql_connection.TRN.add(sql, [self._id])
# turn into dict of samples keyed to artifact
for pid, sample in qdb.sql_connection.TRN.execute_fetchindex():
ret_samples[pid].append(sample)
return ret_samples
return dict(qdb.sql_connection.TRN.execute_fetchindex())

@property
def dropped_samples(self):
Expand Down Expand Up @@ -773,14 +770,20 @@ def remove_samples(self, artifacts=None, samples=None):
qdb.sql_connection.TRN.add(sql, args, many=True)
qdb.sql_connection.TRN.execute()

def build_files(self, rarefaction_depth=None):
def build_files(self,
rarefaction_depth=None,
merge_duplicated_sample_ids=False):
"""Builds biom and mapping files needed for analysis

Parameters
----------
rarefaction_depth : int, optional
Defaults to ``None``. If ``None``, do not rarefy. Otherwise, rarefy
all samples to this number of observations
merge_duplicated_sample_ids : bool, optional
If the duplicated sample ids in the selected studies should be
merged or prepended with the artifact ids. False (default) prepends
the artifact id

Raises
------
Expand All @@ -802,59 +805,69 @@ def build_files(self, rarefaction_depth=None):
raise ValueError(
"rarefaction_depth must be greater than 0")

samples = self._get_samples()
self._build_mapping_file(samples)
self._build_biom_tables(samples, rarefaction_depth)

def _get_samples(self):
"""Retrieves dict of {artifact_id: [sample_ids]}"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_id, array_agg(
sample_id ORDER BY sample_id)
FROM qiita.analysis_sample
WHERE analysis_id = %s
GROUP BY artifact_id"""
qdb.sql_connection.TRN.add(sql, [self._id])
return dict(qdb.sql_connection.TRN.execute_fetchindex())

def _build_biom_tables(self, samples, rarefaction_depth):
# in practice we could retrieve samples in each of the following
# calls but this will mean calling the DB multiple times and will
# make testing much harder as we will need to have analyses at
# different stages and possible errors.
samples = self.samples
# figuring out if we are going to have duplicated samples, again
# doing it here cause it's computetional cheaper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computetional -> computational

all_ids = flatten([samps for _, samps in viewitems(samples)])
duplicated_samples = ((len(all_ids) != len(set(all_ids))) and
merge_duplicated_sample_ids)

self._build_mapping_file(samples, duplicated_samples)
self._build_biom_tables(samples, rarefaction_depth,
duplicated_samples)

def _build_biom_tables(self, samples, rarefaction_depth=None,
duplicated_samples=False):
"""Build tables and add them to the analysis"""
with qdb.sql_connection.TRN:
# filter and combine all study BIOM tables needed for
# each data type
new_tables = {dt: None for dt in self.data_types}
base_fp = qdb.util.get_work_base_dir()
for a_id, samps in viewitems(samples):
# one biom table attached to each artifact object
artifact = qdb.artifact.Artifact(a_id)
table_fp = None

# this assumes that there is only one refererence/pipeline for each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refererence -> reference

# data_type issue #164
new_tables = {dt: None for dt in self.data_types}
for aid, samps in viewitems(samples):
artifact = qdb.artifact.Artifact(aid)
# this is not checking the reference used for picking
# issue #164
biom_table_fp = None
for _, fp, fp_type in artifact.filepaths:
if fp_type == 'biom':
table_fp = fp
biom_table_fp = fp
break
if not table_fp:
if not biom_table_fp:
raise RuntimeError(
"Artifact %s do not have a biom table associated"
% a_id)
table = load_table(table_fp)
# HACKY WORKAROUND FOR DEMO. Issue # 246
# make sure samples not in biom table are not filtered for
table_samps = set(table.ids())
filter_samps = table_samps.intersection(samps)
# add the metadata column for study the samples come from
study_meta = {'Study': artifact.study.title,
'Processed_id': artifact.id}
samples_meta = {sid: study_meta for sid in filter_samps}
# filter for just the wanted samples and merge into new table
# this if/else setup avoids needing a blank table to
# start merges
table.filter(filter_samps, axis='sample', inplace=True)
table.add_metadata(samples_meta, axis='sample')
% aid)
biom_table = load_table(biom_table_fp)
# filtering samples to keep those selected by the user
biom_table_samples = set(biom_table.ids())
selected_samples = biom_table_samples.intersection(samps)
biom_table.filter(selected_samples, axis='sample',
inplace=True)

if not duplicated_samples:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by the name of this variable. If duplicated_samples == True then we rename samples? Can this be changed to rename_duplicated_samples? I'm just thinking in the case in where we have duplicated samples but we want to merge them - the name of this variable is a bit misleading.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rename to rename_dup_samples

ids_map = {_id: "%d.%s" % (aid, _id)
for _id in biom_table.ids()}
biom_table.update_ids(ids_map, 'sample', True, True)

# add the metadata column for study the samples come from,
# this is useful in case the user download the bioms
study_md = {'Study': artifact.study.title, 'Processed_id': aid}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be artifact_id instead of processed_id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, note that this was a left over from when we moved to artifacts and didn't change to not move things around. Changing now.

samples_md = {sid: study_md for sid in selected_samples}
biom_table.add_metadata(samples_md, axis='sample')
data_type = artifact.data_type

# this is not checking the reference used for picking
# issue #164
if new_tables[data_type] is None:
new_tables[data_type] = table
new_tables[data_type] = biom_table
else:
new_tables[data_type] = new_tables[data_type].merge(table)
new_tables[data_type] = \
new_tables[data_type].merge(biom_table)

# add the new tables to the analysis
_, base_fp = qdb.util.get_mountpoint(self._table)[0]
Expand All @@ -870,11 +883,11 @@ def _build_biom_tables(self, samples, rarefaction_depth):
self._add_file("%d_analysis_%s.biom" % (self._id, dt),
"biom", data_type=dt)

def _build_mapping_file(self, samples):
def _build_mapping_file(self, samples, duplicated_samples=False):
"""Builds the combined mapping file for all samples
Code modified slightly from qiime.util.MetadataMap.__add__"""
with qdb.sql_connection.TRN:
all_sample_ids = set()
# query to get the latest qiime mapping file
sql = """SELECT filepath_id, filepath
FROM qiita.filepath
JOIN qiita.prep_template_filepath USING (filepath_id)
Expand All @@ -883,47 +896,45 @@ def _build_mapping_file(self, samples):
WHERE filepath_type = 'qiime_map'
AND artifact_id IN (SELECT *
FROM qiita.find_artifact_roots(%s))
ORDER BY filepath_id DESC"""
_id, fp = qdb.util.get_mountpoint('templates')[0]
to_concat = []
ORDER BY filepath_id DESC LIMIT 1"""
_, fp = qdb.util.get_mountpoint('templates')[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this is the same that fp = qdb.util.get_mountpoint('templates')[0][0]. Probably adding a comment saying that we are only interested in the filepath but not in the id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's clear from the code as we are discarding _. Not sure this is needed. Do you feel strong about it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope


for pid, samples in viewitems(samples):
if len(samples) != len(set(samples)):
duplicates = find_duplicates(samples)
raise qdb.exceptions.QiitaDBError(
"Duplicate sample ids found: %s"
% ', '.join(duplicates))
# Get the QIIME mapping file
all_ids = set()
to_concat = []
for pid, samps in viewitems(samples):
qdb.sql_connection.TRN.add(sql, [pid])
qiime_map_fp = \
qdb.sql_connection.TRN.execute_fetchindex()[0][1]
qm_fp = qdb.sql_connection.TRN.execute_fetchindex()[0][1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized - if you are accession only to [0][1], doesn't it mean that the above query is retrieving more info than needed? I think it should only be SELECT filepath and then here you can use execute_fetchlast. I think it will make the code cleaner and also removes the magic numbers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.


# Parse the mapping file
qiime_map = pd.read_csv(
join(fp, qiime_map_fp), sep='\t', keep_default_na=False,
na_values=['unknown'], index_col=False,
converters=defaultdict(lambda: str))
qiime_map.set_index('#SampleID', inplace=True, drop=True)
qiime_map = qiime_map.loc[samples]

duplicates = all_sample_ids.intersection(qiime_map.index)
if duplicates or len(samples) != len(set(samples)):
# Duplicate samples so raise error
raise qdb.exceptions.QiitaDBError(
"Duplicate sample ids found: %s"
% ', '.join(duplicates))
all_sample_ids.update(qiime_map.index)
to_concat.append(qiime_map)
qm = pd.read_csv(join(fp, qm_fp), sep='\t',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use load_template_to_dataframe in qiita_db.metadata_template.util instead of read_csv? This will help to centralize the parsing of these files, and that function also supports parsing QIIME mapping files (pass the parameter index='#SampleID'.

keep_default_na=False, na_values=['unknown'],
index_col=False,
converters=defaultdict(lambda: str))

# if we are not going to merge the duplicated samples
# append the pid to the sample name
if duplicated_samples:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above regarding the name of the variable. Also it looks like here is working the other way around? If duplicated_samples == True they don't get renamed, while above they do get renamed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing offline we realized that this is correct but the name is misleading.

samps = set(samps) - all_ids
all_ids.update(samps)
else:
qm['#SampleID'] = qm['#SampleID'] + ".%d" % pid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably I remember this wrong - but I think that we agreed it was better to prefix the samples with the pid rather than suffix? @ElDeveloper do you remember what we agreed in here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's also what I recall, specifically in support of the argument
that all the information would be grouped together at the beginning of
the sample name. And users would still be able to use the "original
sample id" column just in case.

On (Feb-16-16|10:04), Jose Navas wrote:

  •                    "Duplicate sample ids found: %s"
    
  •                    % ', '.join(duplicates))
    
  •            all_sample_ids.update(qiime_map.index)
    
  •            to_concat.append(qiime_map)
    
  •            qm = pd.read_csv(join(fp, qm_fp), sep='\t',
    
  •                             keep_default_na=False, na_values=['unknown'],
    
  •                             index_col=False,
    
  •                             converters=defaultdict(lambda: str))
    
  •            # if we are not going to merge the duplicated samples
    
  •            # append the pid to the sample name
    
  •            if duplicated_samples:
    
  •                samps = set(samps) - all_ids
    
  •                all_ids.update(samps)
    
  •            else:
    
  •                qm['#SampleID'] = qm['#SampleID'] + ".%d" % pid
    

Probably I remember this wrong - but I think that we agreed it was better to prefix the samples with the pid rather than suffix? @ElDeveloper do you remember what we agreed in here?


Reply to this email directly or view it on GitHub:
https://github.com/biocore/qiita/pull/1644/files#r53051073

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K, fixing.

qm['qiita_pid'] = pid
samps = ['%s.%d' % (_id, pid) for _id in samps]

qm.set_index('#SampleID', inplace=True, drop=True)
qm = qm.loc[samps]
to_concat.append(qm)

merged_map = pd.concat(to_concat)

# forcing QIIME column order
cols = merged_map.columns.values.tolist()
cols.remove('BarcodeSequence')
cols.remove('LinkerPrimerSequence')
cols.remove('Description')
new_cols = ['BarcodeSequence', 'LinkerPrimerSequence']
new_cols.extend(cols)
new_cols.append('Description')
merged_map = merged_map[new_cols]
cols = (['BarcodeSequence', 'LinkerPrimerSequence'] + cols +
['Description'])
merged_map = merged_map[cols]

# Save the mapping file
_, base_fp = qdb.util.get_mountpoint(self._table)[0]
Expand Down
8 changes: 5 additions & 3 deletions qiita_db/support_files/populate_test_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,15 @@ INSERT INTO qiita.artifact (name, generated_timestamp, command_id, command_param
3, 6, 2, TRUE, TRUE),
('Demultiplexed 2', 'Mon Oct 1 11:30:27 2012', 1, '{"max_bad_run_length":3,"min_per_read_length_fraction":0.75,"sequence_max_n":0,"rev_comp_barcode":false,"rev_comp_mapping_barcodes":true,"rev_comp":false,"phred_quality_threshold":3,"barcode_type":"golay_12","max_barcode_errors":1.5,"input_data":1}'::json,
3, 6, 2, TRUE, TRUE),
('BIOM', 'Tue Oct 2 17:30:00 2012', 3, '{"reference":1,"sortmerna_e_value":1,"sortmerna_max_pos":10000,"similarity":0.97,"sortmerna_coverage":0.97,"threads":1,"input_data":2}'::json,
3, 7, 2, FALSE, FALSE),
('BIOM', 'Tue Oct 2 17:30:00 2012', 3, '{"reference":1,"sortmerna_e_value":1,"sortmerna_max_pos":10000,"similarity":0.97,"sortmerna_coverage":0.97,"threads":1,"input_data":2}'::json,
3, 7, 2, FALSE, FALSE);

-- Link the child artifacts with their parents artifacts
INSERT INTO qiita.parent_artifact (parent_id, artifact_id)
VALUES (1, 2), (1, 3),
(2, 4);
(2, 4), (2, 5);

-- Insert the jobs that processed the previous artifacts
INSERT INTO qiita.processing_job (processing_job_id, email, command_id, command_parameters, processing_job_status_id)
Expand Down Expand Up @@ -362,14 +364,14 @@ INSERT INTO qiita.filepath (filepath, filepath_type_id, checksum, checksum_algor
INSERT INTO qiita.artifact_filepath (artifact_id, filepath_id)
VALUES (1, 1), (1, 2),
(2, 3), (2, 4), (2, 5),
(4, 9);
(4, 9), (5, 9);

-- Link the artifact with the prep template
UPDATE qiita.prep_template SET artifact_id = 1 WHERE prep_template_id = 1;

-- Link the study with the artifacts
INSERT INTO qiita.study_artifact (study_id, artifact_id)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);
VALUES (1, 1), (1, 2), (1, 3), (1, 4), (1, 5);

-- Insert EBI information for artifact 2
INSERT INTO qiita.ebi_run_accession (sample_id, artifact_id, ebi_run_accession)
Expand Down
Loading