|
24 | 24 | from future.utils import viewitems
|
25 | 25 | from biom import load_table
|
26 | 26 | from biom.util import biom_open
|
| 27 | +import pandas as pd |
| 28 | +from skbio.util import find_duplicates |
27 | 29 |
|
28 | 30 | from qiita_core.exceptions import IncompetentQiitaDeveloperError
|
29 | 31 | from .sql_connection import SQLConnectionHandler
|
30 | 32 | from .base import QiitaStatusObject
|
31 |
| -from .data import ProcessedData, RawData |
| 33 | +from .data import ProcessedData |
32 | 34 | from .study import Study
|
33 |
| -from .exceptions import QiitaDBStatusError # QiitaDBNotImplementedError |
| 35 | +from .exceptions import QiitaDBStatusError, QiitaDBError |
34 | 36 | from .util import (convert_to_id, get_work_base_dir,
|
35 |
| - get_mountpoint, get_table_cols, insert_filepaths) |
| 37 | + get_mountpoint, insert_filepaths) |
36 | 38 |
|
37 | 39 |
|
38 | 40 | class Analysis(QiitaStatusObject):
|
@@ -719,78 +721,61 @@ def _build_mapping_file(self, samples, conn_handler=None):
|
719 | 721 | Code modified slightly from qiime.util.MetadataMap.__add__"""
|
720 | 722 | conn_handler = conn_handler if conn_handler is not None \
|
721 | 723 | else SQLConnectionHandler()
|
722 |
| - # We will keep track of all unique sample_ids and metadata headers |
723 |
| - # we have seen as we go, as well as studies already seen |
| 724 | + |
724 | 725 | all_sample_ids = set()
|
725 |
| - all_headers = set(get_table_cols("required_sample_info", conn_handler)) |
726 |
| - all_studies = set() |
| 726 | + sql = """SELECT filepath_id, filepath |
| 727 | + FROM qiita.filepath |
| 728 | + JOIN qiita.prep_template_filepath USING (filepath_id) |
| 729 | + JOIN qiita.prep_template_preprocessed_data |
| 730 | + USING (prep_template_id) |
| 731 | + JOIN qiita.preprocessed_processed_data |
| 732 | + USING (preprocessed_data_id) |
| 733 | + JOIN qiita.filepath_type USING (filepath_type_id) |
| 734 | + WHERE processed_data_id = %s |
| 735 | + AND filepath_type = 'qiime_map' |
| 736 | + ORDER BY filepath_id DESC""" |
| 737 | + _id, fp = get_mountpoint('templates')[0] |
| 738 | + to_concat = [] |
727 | 739 |
|
728 |
| - merged_data = defaultdict(lambda: defaultdict(lambda: None)) |
729 | 740 | for pid, samples in viewitems(samples):
|
730 |
| - if any([all_sample_ids.intersection(samples), |
731 |
| - len(set(samples)) != len(samples)]): |
732 |
| - # duplicate samples so raise error |
733 |
| - raise ValueError("Duplicate sample ids found: %s" % |
734 |
| - str(all_sample_ids.intersection(samples))) |
735 |
| - all_sample_ids.update(samples) |
736 |
| - study_id = ProcessedData(pid).study |
737 |
| - |
738 |
| - # create a convenience study object |
739 |
| - s = Study(study_id) |
740 |
| - |
741 |
| - # get the ids to retrieve the data from the sample and prep tables |
742 |
| - sample_template_id = s.sample_template |
743 |
| - # you can have multiple different prep templates but we are only |
744 |
| - # using the one for 16S i. e. the last one ... sorry ;l |
745 |
| - # see issue https://github.com/biocore/qiita/issues/465 |
746 |
| - prep_template_id = RawData(s.raw_data()[0]).prep_templates[-1] |
747 |
| - |
748 |
| - if study_id in all_studies: |
749 |
| - # samples already added by other processed data file |
750 |
| - # with the study_id |
751 |
| - continue |
752 |
| - all_studies.add(study_id) |
753 |
| - # add headers to set of all headers found |
754 |
| - all_headers.update(get_table_cols("sample_%d" % sample_template_id, |
755 |
| - conn_handler)) |
756 |
| - all_headers.update(get_table_cols("prep_%d" % prep_template_id, |
757 |
| - conn_handler)) |
758 |
| - # NEED TO ADD COMMON PREP INFO Issue #247 |
759 |
| - sql = ("SELECT rs.*, p.*, ss.* " |
760 |
| - "FROM qiita.required_sample_info rs JOIN qiita.sample_{0} " |
761 |
| - "ss USING(sample_id) JOIN qiita.prep_{1} p USING(sample_id)" |
762 |
| - " WHERE rs.sample_id IN {2} AND rs.study_id = {3}".format( |
763 |
| - sample_template_id, prep_template_id, |
764 |
| - "(%s)" % ",".join("'%s'" % s for s in samples), |
765 |
| - study_id)) |
766 |
| - metadata = conn_handler.execute_fetchall(sql) |
767 |
| - # add all the metadata to merged_data |
768 |
| - for data in metadata: |
769 |
| - sample_id = data['sample_id'] |
770 |
| - for header, value in viewitems(data): |
771 |
| - if header in {'sample_id'}: |
772 |
| - continue |
773 |
| - merged_data[sample_id][header] = str(value) |
774 |
| - |
775 |
| - # prep headers, making sure they follow mapping file format rules |
776 |
| - all_headers = list(all_headers - {'linkerprimersequence', |
777 |
| - 'barcodesequence', 'description', 'sample_id'}) |
778 |
| - all_headers.sort() |
779 |
| - all_headers = ['BarcodeSequence', 'LinkerPrimerSequence'] + all_headers |
780 |
| - all_headers.append('Description') |
781 |
| - |
782 |
| - # write mapping file out |
| 741 | + if len(samples) != len(set(samples)): |
| 742 | + duplicates = find_duplicates(samples) |
| 743 | + raise QiitaDBError("Duplicate sample ids found: %s" |
| 744 | + % ', '.join(duplicates)) |
| 745 | + # Get the QIIME mapping file |
| 746 | + qiime_map_fp = conn_handler.execute_fetchall(sql, (pid,))[0][1] |
| 747 | + # Parse the mapping file |
| 748 | + qiime_map = pd.read_csv( |
| 749 | + join(fp, qiime_map_fp), sep='\t', keep_default_na=False, |
| 750 | + na_values=['unknown'], index_col=False, |
| 751 | + converters=defaultdict(lambda: str)) |
| 752 | + qiime_map.set_index('#SampleID', inplace=True, drop=True) |
| 753 | + qiime_map = qiime_map.loc[samples] |
| 754 | + |
| 755 | + duplicates = all_sample_ids.intersection(qiime_map.index) |
| 756 | + if duplicates or len(samples) != len(set(samples)): |
| 757 | + # Duplicate samples so raise error |
| 758 | + raise QiitaDBError("Duplicate sample ids found: %s" |
| 759 | + % ', '.join(duplicates)) |
| 760 | + all_sample_ids.update(qiime_map.index) |
| 761 | + to_concat.append(qiime_map) |
| 762 | + |
| 763 | + merged_map = pd.concat(to_concat) |
| 764 | + |
| 765 | + cols = merged_map.columns.values.tolist() |
| 766 | + cols.remove('BarcodeSequence') |
| 767 | + cols.remove('LinkerPrimerSequence') |
| 768 | + cols.remove('Description') |
| 769 | + new_cols = ['BarcodeSequence', 'LinkerPrimerSequence'] |
| 770 | + new_cols.extend(cols) |
| 771 | + new_cols.append('Description') |
| 772 | + merged_map = merged_map[new_cols] |
| 773 | + |
| 774 | + # Save the mapping file |
783 | 775 | _, base_fp = get_mountpoint(self._table)[0]
|
784 | 776 | mapping_fp = join(base_fp, "%d_analysis_mapping.txt" % self._id)
|
785 |
| - with open(mapping_fp, 'w') as f: |
786 |
| - f.write("#SampleID\t%s\n" % '\t'.join(all_headers)) |
787 |
| - for sample, metadata in viewitems(merged_data): |
788 |
| - data = [sample] |
789 |
| - for header in all_headers: |
790 |
| - l_head = header.lower() |
791 |
| - data.append(metadata[l_head] if |
792 |
| - metadata[l_head] is not None else "no_data") |
793 |
| - f.write("%s\n" % "\t".join(data)) |
| 777 | + merged_map.to_csv(mapping_fp, index_label='#SampleID', |
| 778 | + na_rep='unknown', sep='\t') |
794 | 779 |
|
795 | 780 | self._add_file("%d_analysis_mapping.txt" % self._id,
|
796 | 781 | "plain_text", conn_handler=conn_handler)
|
|
0 commit comments