-
Couldn't load subscription status.
- Fork 79
fix #1236 #1644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix #1236 #1644
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,7 @@ | |
| from biom import load_table | ||
| from biom.util import biom_open | ||
| import pandas as pd | ||
| from skbio.util import find_duplicates | ||
| from skbio.util import flatten | ||
|
|
||
| from qiita_core.exceptions import IncompetentQiitaDeveloperError | ||
| from qiita_core.qiita_settings import qiita_config | ||
|
|
@@ -363,16 +363,13 @@ def samples(self): | |
| Format is {artifact_id: [sample_id, sample_id, ...]} | ||
| """ | ||
| with qdb.sql_connection.TRN: | ||
| sql = """SELECT artifact_id, sample_id | ||
| sql = """SELECT artifact_id, array_agg( | ||
| sample_id ORDER BY sample_id) | ||
| FROM qiita.analysis_sample | ||
| WHERE analysis_id = %s | ||
| ORDER BY artifact_id""" | ||
| ret_samples = defaultdict(list) | ||
| GROUP BY artifact_id""" | ||
| qdb.sql_connection.TRN.add(sql, [self._id]) | ||
| # turn into dict of samples keyed to artifact | ||
| for pid, sample in qdb.sql_connection.TRN.execute_fetchindex(): | ||
| ret_samples[pid].append(sample) | ||
| return ret_samples | ||
| return dict(qdb.sql_connection.TRN.execute_fetchindex()) | ||
|
|
||
| @property | ||
| def dropped_samples(self): | ||
|
|
@@ -773,14 +770,20 @@ def remove_samples(self, artifacts=None, samples=None): | |
| qdb.sql_connection.TRN.add(sql, args, many=True) | ||
| qdb.sql_connection.TRN.execute() | ||
|
|
||
| def build_files(self, rarefaction_depth=None): | ||
| def build_files(self, | ||
| rarefaction_depth=None, | ||
| merge_duplicated_sample_ids=False): | ||
| """Builds biom and mapping files needed for analysis | ||
|
|
||
| Parameters | ||
| ---------- | ||
| rarefaction_depth : int, optional | ||
| Defaults to ``None``. If ``None``, do not rarefy. Otherwise, rarefy | ||
| all samples to this number of observations | ||
| merge_duplicated_sample_ids : bool, optional | ||
| If the duplicated sample ids in the selected studies should be | ||
| merged or prepended with the artifact ids. False (default) prepends | ||
| the artifact id | ||
|
|
||
| Raises | ||
| ------ | ||
|
|
@@ -802,59 +805,69 @@ def build_files(self, rarefaction_depth=None): | |
| raise ValueError( | ||
| "rarefaction_depth must be greater than 0") | ||
|
|
||
| samples = self._get_samples() | ||
| self._build_mapping_file(samples) | ||
| self._build_biom_tables(samples, rarefaction_depth) | ||
|
|
||
| def _get_samples(self): | ||
| """Retrieves dict of {artifact_id: [sample_ids]}""" | ||
| with qdb.sql_connection.TRN: | ||
| sql = """SELECT artifact_id, array_agg( | ||
| sample_id ORDER BY sample_id) | ||
| FROM qiita.analysis_sample | ||
| WHERE analysis_id = %s | ||
| GROUP BY artifact_id""" | ||
| qdb.sql_connection.TRN.add(sql, [self._id]) | ||
| return dict(qdb.sql_connection.TRN.execute_fetchindex()) | ||
|
|
||
| def _build_biom_tables(self, samples, rarefaction_depth): | ||
| # in practice we could retrieve samples in each of the following | ||
| # calls but this will mean calling the DB multiple times and will | ||
| # make testing much harder as we will need to have analyses at | ||
| # different stages and possible errors. | ||
| samples = self.samples | ||
| # figuring out if we are going to have duplicated samples, again | ||
| # doing it here cause it's computetional cheaper | ||
|
||
| all_ids = flatten([samps for _, samps in viewitems(samples)]) | ||
| duplicated_samples = ((len(all_ids) != len(set(all_ids))) and | ||
| merge_duplicated_sample_ids) | ||
|
|
||
| self._build_mapping_file(samples, duplicated_samples) | ||
| self._build_biom_tables(samples, rarefaction_depth, | ||
| duplicated_samples) | ||
|
|
||
| def _build_biom_tables(self, samples, rarefaction_depth=None, | ||
| duplicated_samples=False): | ||
| """Build tables and add them to the analysis""" | ||
| with qdb.sql_connection.TRN: | ||
| # filter and combine all study BIOM tables needed for | ||
| # each data type | ||
| new_tables = {dt: None for dt in self.data_types} | ||
| base_fp = qdb.util.get_work_base_dir() | ||
| for a_id, samps in viewitems(samples): | ||
| # one biom table attached to each artifact object | ||
| artifact = qdb.artifact.Artifact(a_id) | ||
| table_fp = None | ||
|
|
||
| # this assumes that there is only one refererence/pipeline for each | ||
|
||
| # data_type issue #164 | ||
| new_tables = {dt: None for dt in self.data_types} | ||
| for aid, samps in viewitems(samples): | ||
| artifact = qdb.artifact.Artifact(aid) | ||
| # this is not checking the reference used for picking | ||
| # issue #164 | ||
| biom_table_fp = None | ||
| for _, fp, fp_type in artifact.filepaths: | ||
| if fp_type == 'biom': | ||
| table_fp = fp | ||
| biom_table_fp = fp | ||
| break | ||
| if not table_fp: | ||
| if not biom_table_fp: | ||
| raise RuntimeError( | ||
| "Artifact %s do not have a biom table associated" | ||
| % a_id) | ||
| table = load_table(table_fp) | ||
| # HACKY WORKAROUND FOR DEMO. Issue # 246 | ||
| # make sure samples not in biom table are not filtered for | ||
| table_samps = set(table.ids()) | ||
| filter_samps = table_samps.intersection(samps) | ||
| # add the metadata column for study the samples come from | ||
| study_meta = {'Study': artifact.study.title, | ||
| 'Processed_id': artifact.id} | ||
| samples_meta = {sid: study_meta for sid in filter_samps} | ||
| # filter for just the wanted samples and merge into new table | ||
| # this if/else setup avoids needing a blank table to | ||
| # start merges | ||
| table.filter(filter_samps, axis='sample', inplace=True) | ||
| table.add_metadata(samples_meta, axis='sample') | ||
| % aid) | ||
| biom_table = load_table(biom_table_fp) | ||
| # filtering samples to keep those selected by the user | ||
| biom_table_samples = set(biom_table.ids()) | ||
| selected_samples = biom_table_samples.intersection(samps) | ||
| biom_table.filter(selected_samples, axis='sample', | ||
| inplace=True) | ||
|
|
||
| if not duplicated_samples: | ||
|
||
| ids_map = {_id: "%d.%s" % (aid, _id) | ||
| for _id in biom_table.ids()} | ||
| biom_table.update_ids(ids_map, 'sample', True, True) | ||
|
|
||
| # add the metadata column for study the samples come from, | ||
| # this is useful in case the user download the bioms | ||
| study_md = {'Study': artifact.study.title, 'Processed_id': aid} | ||
|
||
| samples_md = {sid: study_md for sid in selected_samples} | ||
| biom_table.add_metadata(samples_md, axis='sample') | ||
| data_type = artifact.data_type | ||
|
|
||
| # this is not checking the reference used for picking | ||
| # issue #164 | ||
| if new_tables[data_type] is None: | ||
| new_tables[data_type] = table | ||
| new_tables[data_type] = biom_table | ||
| else: | ||
| new_tables[data_type] = new_tables[data_type].merge(table) | ||
| new_tables[data_type] = \ | ||
| new_tables[data_type].merge(biom_table) | ||
|
|
||
| # add the new tables to the analysis | ||
| _, base_fp = qdb.util.get_mountpoint(self._table)[0] | ||
|
|
@@ -870,11 +883,11 @@ def _build_biom_tables(self, samples, rarefaction_depth): | |
| self._add_file("%d_analysis_%s.biom" % (self._id, dt), | ||
| "biom", data_type=dt) | ||
|
|
||
| def _build_mapping_file(self, samples): | ||
| def _build_mapping_file(self, samples, duplicated_samples=False): | ||
| """Builds the combined mapping file for all samples | ||
| Code modified slightly from qiime.util.MetadataMap.__add__""" | ||
| with qdb.sql_connection.TRN: | ||
| all_sample_ids = set() | ||
| # query to get the latest qiime mapping file | ||
| sql = """SELECT filepath_id, filepath | ||
| FROM qiita.filepath | ||
| JOIN qiita.prep_template_filepath USING (filepath_id) | ||
|
|
@@ -883,47 +896,45 @@ def _build_mapping_file(self, samples): | |
| WHERE filepath_type = 'qiime_map' | ||
| AND artifact_id IN (SELECT * | ||
| FROM qiita.find_artifact_roots(%s)) | ||
| ORDER BY filepath_id DESC""" | ||
| _id, fp = qdb.util.get_mountpoint('templates')[0] | ||
| to_concat = [] | ||
| ORDER BY filepath_id DESC LIMIT 1""" | ||
| _, fp = qdb.util.get_mountpoint('templates')[0] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And this is the same that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's clear from the code as we are discarding _. Not sure this is needed. Do you feel strong about it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope |
||
|
|
||
| for pid, samples in viewitems(samples): | ||
| if len(samples) != len(set(samples)): | ||
| duplicates = find_duplicates(samples) | ||
| raise qdb.exceptions.QiitaDBError( | ||
| "Duplicate sample ids found: %s" | ||
| % ', '.join(duplicates)) | ||
| # Get the QIIME mapping file | ||
| all_ids = set() | ||
| to_concat = [] | ||
| for pid, samps in viewitems(samples): | ||
| qdb.sql_connection.TRN.add(sql, [pid]) | ||
| qiime_map_fp = \ | ||
| qdb.sql_connection.TRN.execute_fetchindex()[0][1] | ||
| qm_fp = qdb.sql_connection.TRN.execute_fetchindex()[0][1] | ||
|
||
|
|
||
| # Parse the mapping file | ||
| qiime_map = pd.read_csv( | ||
| join(fp, qiime_map_fp), sep='\t', keep_default_na=False, | ||
| na_values=['unknown'], index_col=False, | ||
| converters=defaultdict(lambda: str)) | ||
| qiime_map.set_index('#SampleID', inplace=True, drop=True) | ||
| qiime_map = qiime_map.loc[samples] | ||
|
|
||
| duplicates = all_sample_ids.intersection(qiime_map.index) | ||
| if duplicates or len(samples) != len(set(samples)): | ||
| # Duplicate samples so raise error | ||
| raise qdb.exceptions.QiitaDBError( | ||
| "Duplicate sample ids found: %s" | ||
| % ', '.join(duplicates)) | ||
| all_sample_ids.update(qiime_map.index) | ||
| to_concat.append(qiime_map) | ||
| qm = pd.read_csv(join(fp, qm_fp), sep='\t', | ||
|
||
| keep_default_na=False, na_values=['unknown'], | ||
| index_col=False, | ||
| converters=defaultdict(lambda: str)) | ||
|
|
||
| # if we are not going to merge the duplicated samples | ||
| # append the pid to the sample name | ||
| if duplicated_samples: | ||
|
||
| samps = set(samps) - all_ids | ||
| all_ids.update(samps) | ||
| else: | ||
| qm['#SampleID'] = qm['#SampleID'] + ".%d" % pid | ||
|
||
| qm['qiita_pid'] = pid | ||
| samps = ['%s.%d' % (_id, pid) for _id in samps] | ||
|
|
||
| qm.set_index('#SampleID', inplace=True, drop=True) | ||
| qm = qm.loc[samps] | ||
| to_concat.append(qm) | ||
|
|
||
| merged_map = pd.concat(to_concat) | ||
|
|
||
| # forcing QIIME column order | ||
| cols = merged_map.columns.values.tolist() | ||
| cols.remove('BarcodeSequence') | ||
| cols.remove('LinkerPrimerSequence') | ||
| cols.remove('Description') | ||
| new_cols = ['BarcodeSequence', 'LinkerPrimerSequence'] | ||
| new_cols.extend(cols) | ||
| new_cols.append('Description') | ||
| merged_map = merged_map[new_cols] | ||
| cols = (['BarcodeSequence', 'LinkerPrimerSequence'] + cols + | ||
| ['Description']) | ||
| merged_map = merged_map[cols] | ||
|
|
||
| # Save the mapping file | ||
| _, base_fp = qdb.util.get_mountpoint(self._table)[0] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flattenhas been removed in later versions of skbio. In other parts of the code, we have been using the construction below to use flatten:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
K