|
| 1 | +from os import walk |
| 2 | +from os.path import exists |
| 3 | +from sequence_processing_pipeline.PipelineError import PipelineError |
| 4 | +import pandas as pd |
| 5 | +from qp_klp.Step import FailedSamplesRecord |
| 6 | +from os.path import join, basename |
| 7 | +from qp_klp.Step import Step |
| 8 | + |
| 9 | + |
| 10 | +class Metagenomic(Step): |
| 11 | + def __init__(self, pipeline, master_qiita_job_id, |
| 12 | + status_update_callback=None): |
| 13 | + super().__init__(pipeline, |
| 14 | + master_qiita_job_id, |
| 15 | + status_update_callback) |
| 16 | + |
| 17 | + if pipeline.pipeline_type not in Step.META_TYPES: |
| 18 | + raise ValueError("Cannot instantiate Metagenomic object from " |
| 19 | + f"pipeline of type '{pipeline.pipeline_type}'") |
| 20 | + |
| 21 | + # Note: FailedSamplesRecord is not used when processing amplicon as the |
| 22 | + # samples are processed as a single fastq file and hence that info |
| 23 | + # is not available. |
| 24 | + self.fsr = FailedSamplesRecord(self.pipeline.output_path, |
| 25 | + pipeline.sample_sheet.samples) |
| 26 | + |
| 27 | + def convert_bcl_to_fastq(self): |
| 28 | + # The 'bcl-convert' key is a convention hard-coded into mg-scripts and |
| 29 | + # qp-klp projects. Currently meta*omic jobs use bcl-convert for its |
| 30 | + # improved performance over bcl2fastq. The name and path of the |
| 31 | + # executable, the resource requirements to instantiate a SLURM job |
| 32 | + # with, etc. are stored in configuration['bcl-convert'']. |
| 33 | + config = self.pipeline.configuration['bcl-convert'] |
| 34 | + job = super()._convert_bcl_to_fastq(config, |
| 35 | + self.pipeline.sample_sheet.path) |
| 36 | + self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'ConvertJob') |
| 37 | + |
| 38 | + def quality_control(self): |
| 39 | + config = self.pipeline.configuration['qc'] |
| 40 | + job = super()._quality_control(config, self.pipeline.sample_sheet.path) |
| 41 | + self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'QCJob') |
| 42 | + |
| 43 | + def generate_reports(self): |
| 44 | + job = super()._generate_reports() |
| 45 | + self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'FastQCJob') |
| 46 | + |
| 47 | + self.project_names = job.project_names |
| 48 | + |
| 49 | + def generate_prep_file(self): |
| 50 | + config = self.pipeline.configuration['seqpro'] |
| 51 | + |
| 52 | + if self.project_names is None: |
| 53 | + raise ValueError("reports not yet generated") |
| 54 | + |
| 55 | + job = super()._generate_prep_file(config, |
| 56 | + self.pipeline.sample_sheet.path, |
| 57 | + config['seqpro_path'], |
| 58 | + self.project_names) |
| 59 | + |
| 60 | + self.prep_file_paths = job.prep_file_paths |
| 61 | + |
| 62 | + def generate_touched_studies(self, qclient): |
| 63 | + results = {} |
| 64 | + |
| 65 | + for study_id, pf_paths in self.prep_file_paths.items(): |
| 66 | + for pf_path in pf_paths: |
| 67 | + # record the data-type as either metagenomic or |
| 68 | + # metatranscriptomic, according to what's stored in the |
| 69 | + # pipeline. |
| 70 | + results[pf_path] = self.pipeline.pipeline_type |
| 71 | + |
| 72 | + super()._generate_touched_studies(qclient, results) |
| 73 | + |
| 74 | + def generate_commands(self, qclient): |
| 75 | + super()._generate_commands() |
| 76 | + |
| 77 | + out_dir = self.pipeline.output_path |
| 78 | + output_path = self.pipeline.output_path |
| 79 | + |
| 80 | + self.cmds.append(f'cd {self.pipeline.output_path}; ' |
| 81 | + 'tar zcvf logs-QCJob.tgz QCJob/logs') |
| 82 | + |
| 83 | + self.cmds.append(f'cd {self.pipeline.output_path}; ' |
| 84 | + 'tar zcvf reports-ConvertJob.tgz ConvertJob/Reports ' |
| 85 | + 'ConvertJob/Logs') |
| 86 | + |
| 87 | + self.write_commands_to_output_path() |
| 88 | + |
| 89 | + if self.sifs: |
| 90 | + # just use the filenames for tarballing the sifs. |
| 91 | + # the sifs should all be stored in the {out_dir} by default. |
| 92 | + tmp = [basename(x) for x in self.sifs] |
| 93 | + # convert sifs into a list of filenames. |
| 94 | + tmp = ' '.join(tmp) |
| 95 | + self.cmds.append(f'cd {out_dir}; tar zcvf sample-files.tgz {tmp}') |
| 96 | + |
| 97 | + csv_fps = [] |
| 98 | + for root, dirs, files in walk(join(output_path, 'PrepFiles')): |
| 99 | + for csv_file in files: |
| 100 | + csv_fps.append(join(root, csv_file)) |
| 101 | + |
| 102 | + touched_studies = [] |
| 103 | + |
| 104 | + for project, upload_dir, qiita_id in self.special_map: |
| 105 | + # sif filenames are of the form: |
| 106 | + blanks_file = f'{self.pipeline.run_id}_{project}_blanks.tsv' |
| 107 | + if self.sifs and [x for x in self.sifs if blanks_file in x]: |
| 108 | + # move uncompressed sifs to upload_dir. |
| 109 | + tmp = f'cd {out_dir}; mv {blanks_file} {upload_dir}' |
| 110 | + self.cmds.append(tmp) |
| 111 | + |
| 112 | + # record that something is being moved into a Qiita Study. |
| 113 | + # this will allow us to notify the user which Studies to |
| 114 | + # review upon completion. |
| 115 | + touched_studies.append((qiita_id, project)) |
| 116 | + |
| 117 | + if self.pipeline.pipeline_type in Step.META_TYPES: |
| 118 | + self.cmds.append(f'cd {out_dir}; tar zcvf reports-QCJob.tgz ' |
| 119 | + f'QCJob/{project}/fastp_reports_dir') |
| 120 | + |
| 121 | + if exists(f'{out_dir}/QCJob/{project}/filtered_sequences'): |
| 122 | + self.cmds.append(f'cd {out_dir}; mv ' |
| 123 | + f'QCJob/{project}/filtered_sequences/* ' |
| 124 | + f'{upload_dir}') |
| 125 | + elif exists(f'{out_dir}/QCJob/{project}/trimmed_sequences'): |
| 126 | + self.cmds.append(f'cd {out_dir}; mv ' |
| 127 | + f'QCJob/{project}/trimmed_sequences/* ' |
| 128 | + f'{upload_dir}') |
| 129 | + elif exists(f'{out_dir}/QCJob/{project}/amplicon'): |
| 130 | + self.cmds.append(f'cd {out_dir}; mv ' |
| 131 | + f'QCJob/{project}/amplicon/* ' |
| 132 | + f'{upload_dir}') |
| 133 | + else: |
| 134 | + raise PipelineError("QCJob output not in expected location") |
| 135 | + |
| 136 | + for csv_file in csv_fps: |
| 137 | + if project in csv_file: |
| 138 | + tmp = f'cd {out_dir}; mv {csv_file} {upload_dir}' |
| 139 | + self.cmds.append(tmp) |
| 140 | + break |
| 141 | + |
| 142 | + # create a set of unique study-ids that were touched by the Pipeline |
| 143 | + # and return this information to the user. |
| 144 | + touched_studies = sorted(list(set(touched_studies))) |
| 145 | + |
| 146 | + data = [] |
| 147 | + for qiita_id, project in touched_studies: |
| 148 | + for prep_id in self.touched_studies_prep_info[qiita_id]: |
| 149 | + surl = f'{qclient._server_url}/study/description/{qiita_id}' |
| 150 | + prep_url = (f'{qclient._server_url}/study/description/' |
| 151 | + f'{qiita_id}?prep_id={prep_id}') |
| 152 | + data.append({'Project': project, 'Qiita Study ID': qiita_id, |
| 153 | + 'Qiita Prep ID': prep_id, 'Qiita URL': surl, |
| 154 | + 'Prep URL': prep_url}) |
| 155 | + |
| 156 | + df = pd.DataFrame(data) |
| 157 | + |
| 158 | + with open(join(out_dir, 'touched_studies.html'), 'w') as f: |
| 159 | + f.write(df.to_html(border=2, index=False, justify="left", |
| 160 | + render_links=True, escape=False)) |
| 161 | + |
| 162 | + # copy all tgz files, including sample-files.tgz, to final_results. |
| 163 | + self.cmds.append(f'cd {out_dir}; mv *.tgz final_results') |
| 164 | + self.cmds.append(f'cd {out_dir}; mv FastQCJob/multiqc final_results') |
| 165 | + |
| 166 | + if exists(join(out_dir, 'touched_studies.html')): |
| 167 | + tmp = f'cd {out_dir}; mv touched_studies.html final_results' |
| 168 | + self.cmds.append(tmp) |
| 169 | + |
| 170 | + if exists(join(out_dir, 'failed_samples.html')): |
| 171 | + tmp = f'cd {out_dir}; mv failed_samples.html final_results' |
| 172 | + self.cmds.append(tmp) |
0 commit comments