Skip to content

Commit b72a1ac

Browse files
DRY refactoring of qp-klp.
This code was originally reviewed and approved as part of the following PRs: #60 #61 Code refactored to be more extensible to new Assay types. Also, monolithic unit-tests that required testing on Qiita-RC were removed. New unittests are more numerous and test smaller self-contained functionality. Qiita is no longer needed for testing; FakeQiita class is now used to emulate canned Qiita API queries. Job submission and SLURM responses are also emulated w/fake binaries.
1 parent bc1d524 commit b72a1ac

25 files changed

+3434
-4203
lines changed

.github/workflows/qiita-plugin-ci.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ jobs:
8686
pip --quiet install https://github.com/qiita-spots/qtp-job-output-folder/archive/refs/heads/main.zip
8787
pip --quiet install .
8888
pip --quiet install coveralls
89+
8990
export QP_KLP_CONFIG_FP=`pwd`/configuration.json
9091
9192
configure_qtp_job_output_folder --env-script "source /home/runner/.profile; conda activate klp" --server-cert $QIITA_SERVER_CERT
@@ -134,13 +135,10 @@ jobs:
134135
export QIITA_CONFIG_FP=`pwd`/qiita-dev/qiita_core/support_files/config_test_local.cfg
135136
export QP_KLP_CONFIG_FP=`pwd`/configuration.json
136137
export PYTHONWARNINGS="ignore:Certificate for localhost has no \`subjectAltName\`"
137-
138138
nosetests --with-doctest --with-coverage -v --cover-package=qp_klp
139-
140139
- uses: codecov/codecov-action@v3
141140
with:
142141
token: ${{ secrets.CODECOV_TOKEN }}
143-
file: codecov.yml
144142

145143
lint:
146144
runs-on: ubuntu-latest
@@ -157,3 +155,4 @@ jobs:
157155
run: |
158156
pip install -q flake8
159157
flake8 qp_klp setup.py scripts/*
158+

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,5 +128,6 @@ dmypy.json
128128
# Pyre type checker
129129
.pyre/
130130

131+
131132
# Plugin configuration file
132133
configuration.json

qp_klp/Amplicon.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
from os import listdir, makedirs
2+
from os.path import join, isfile, basename
3+
from shutil import copyfile
4+
from qp_klp.Step import Step
5+
6+
7+
class Amplicon(Step):
8+
def __init__(self, pipeline, master_qiita_job_id,
9+
status_update_callback=None):
10+
super().__init__(pipeline,
11+
master_qiita_job_id,
12+
status_update_callback)
13+
14+
if pipeline.pipeline_type != Step.AMPLICON_TYPE:
15+
raise ValueError("Cannot create an Amplicon run using a "
16+
f"{pipeline.pipeline_type}-configured Pipeline.")
17+
18+
def convert_bcl_to_fastq(self):
19+
# The 'bcl2fastq' key is a convention hard-coded into mg-scripts and
20+
# qp-klp projects. By design and history, amplicon jobs use EMP primers
21+
# and are demuxed downstream of qp-klp by Qiita. This necessitates the
22+
# use of bcl2fastq and a 'dummy' sample-sheet to avoid the
23+
# demultiplexing that otherwise occurs at this stage. The name and
24+
# path of the executable, the resource requirements to instantiate a
25+
# SLURM job with, etc. are stored in configuration['bcl2fastq'].
26+
config = self.pipeline.configuration['bcl2fastq']
27+
super()._convert_bcl_to_fastq(config, self.pipeline.sample_sheet)
28+
29+
def quality_control(self):
30+
# Quality control for Amplicon runs occurs downstream.
31+
# Do not perform QC at this time.
32+
33+
# Simulate QCJob's output directory for use as input into FastQCJob.
34+
projects = self.pipeline.get_project_info()
35+
projects = [x['project_name'] for x in projects]
36+
37+
for project_name in projects:
38+
# copy the files from ConvertJob output to faked QCJob output
39+
# folder: $WKDIR/$RUN_ID/QCJob/$PROJ_NAME/amplicon
40+
output_folder = join(self.pipeline.output_path,
41+
'QCJob',
42+
project_name,
43+
# for legacy purposes, output folders are
44+
# either 'trimmed_sequences', 'amplicon', or
45+
# 'filtered_sequences'. Hence, this folder
46+
# is not defined using AMPLICON_TYPE as that
47+
# value may or may not equal the needed value.
48+
'amplicon')
49+
50+
makedirs(output_folder)
51+
52+
raw_fastq_files_path = join(self.pipeline.output_path,
53+
'ConvertJob')
54+
55+
# get list of all raw output files to be copied.
56+
job_output = [join(raw_fastq_files_path, x) for x in
57+
listdir(raw_fastq_files_path)]
58+
job_output = [x for x in job_output if isfile(x)]
59+
job_output = [x for x in job_output if x.endswith('fastq.gz')]
60+
# Undetermined files are very small and should be filtered from
61+
# results.
62+
job_output = [x for x in job_output if
63+
not basename(x).startswith('Undetermined')]
64+
65+
# copy the file
66+
for fastq_file in job_output:
67+
new_path = join(output_folder, basename(fastq_file))
68+
copyfile(fastq_file, new_path)
69+
70+
# FastQC expects the ConvertJob output to also be organized by
71+
# project. Since this would entail running the same ConvertJob
72+
# multiple times on the same input with just a name-change in
73+
# the dummy sample-sheet, we instead perform ConvertJob once
74+
# and copy the output from ConvertJob's output folder into
75+
# ConvertJob's output folder/project1, project2 ... projectN.
76+
output_folder = join(raw_fastq_files_path, project_name)
77+
makedirs(output_folder)
78+
79+
job_output = [join(raw_fastq_files_path, x) for x in
80+
listdir(raw_fastq_files_path)]
81+
job_output = [x for x in job_output if isfile(x) and x.endswith(
82+
'fastq.gz') and not basename(x).startswith('Undetermined')]
83+
84+
for raw_fastq_file in job_output:
85+
new_path = join(output_folder, basename(raw_fastq_file))
86+
copyfile(raw_fastq_file, new_path)
87+
88+
def generate_reports(self):
89+
super()._generate_reports()
90+
return None # amplicon doesn't need project names
91+
92+
def _get_data_type(self, prep_file_path):
93+
metadata = Step.parse_prep_file(prep_file_path)
94+
if 'target_gene' in metadata.columns:
95+
# remove duplicate values, then convert back to list for
96+
# accession.
97+
tg = list(set(metadata['sample target_gene']))
98+
if len(tg) != 1:
99+
raise ValueError("More than one value for target_gene")
100+
101+
if tg[0] in Step.AMPLICON_SUB_TYPES:
102+
return tg[0]
103+
104+
raise ValueError(f"'{tg[0]}' is not a valid type - valid data-"
105+
f"types are {Step.AMPLICON_SUB_TYPES}")
106+
else:
107+
raise ValueError("'target_gene' column not present in "
108+
"generated prep-files")
109+
110+
def generate_touched_studies(self, qclient):
111+
results = {}
112+
for study_id, pf_paths in self.prep_file_paths.items():
113+
for pf_path in pf_paths:
114+
results[pf_path] = self._get_data_type(pf_path)
115+
116+
super()._generate_touched_studies(qclient, results)
117+
118+
def generate_prep_file(self):
119+
config = self.pipeline.configuration['seqpro']
120+
seqpro_path = config['seqpro_path'].replace('seqpro', 'seqpro_mf')
121+
project_names = [x['project_name'] for x in
122+
self.pipeline.get_project_info()]
123+
124+
job = super()._generate_prep_file(config,
125+
self.pipeline.mapping_file_path,
126+
seqpro_path,
127+
project_names)
128+
129+
self.prep_file_paths = job.prep_file_paths
130+
131+
def generate_commands(self, qclient):
132+
super()._generate_commands()
133+
self.cmds.append(f'cd {self.pipeline.output_path}; '
134+
'tar zcvf reports-ConvertJob.tgz ConvertJob/Reports')
135+
self.write_commands_to_output_path()

qp_klp/Metagenomic.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
from os import walk
2+
from os.path import exists
3+
from sequence_processing_pipeline.PipelineError import PipelineError
4+
import pandas as pd
5+
from qp_klp.Step import FailedSamplesRecord
6+
from os.path import join, basename
7+
from qp_klp.Step import Step
8+
9+
10+
class Metagenomic(Step):
11+
def __init__(self, pipeline, master_qiita_job_id,
12+
status_update_callback=None):
13+
super().__init__(pipeline,
14+
master_qiita_job_id,
15+
status_update_callback)
16+
17+
if pipeline.pipeline_type not in Step.META_TYPES:
18+
raise ValueError("Cannot instantiate Metagenomic object from "
19+
f"pipeline of type '{pipeline.pipeline_type}'")
20+
21+
# Note: FailedSamplesRecord is not used when processing amplicon as the
22+
# samples are processed as a single fastq file and hence that info
23+
# is not available.
24+
self.fsr = FailedSamplesRecord(self.pipeline.output_path,
25+
pipeline.sample_sheet.samples)
26+
27+
def convert_bcl_to_fastq(self):
28+
# The 'bcl-convert' key is a convention hard-coded into mg-scripts and
29+
# qp-klp projects. Currently meta*omic jobs use bcl-convert for its
30+
# improved performance over bcl2fastq. The name and path of the
31+
# executable, the resource requirements to instantiate a SLURM job
32+
# with, etc. are stored in configuration['bcl-convert''].
33+
config = self.pipeline.configuration['bcl-convert']
34+
job = super()._convert_bcl_to_fastq(config,
35+
self.pipeline.sample_sheet.path)
36+
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'ConvertJob')
37+
38+
def quality_control(self):
39+
config = self.pipeline.configuration['qc']
40+
job = super()._quality_control(config, self.pipeline.sample_sheet.path)
41+
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'QCJob')
42+
43+
def generate_reports(self):
44+
job = super()._generate_reports()
45+
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'FastQCJob')
46+
47+
self.project_names = job.project_names
48+
49+
def generate_prep_file(self):
50+
config = self.pipeline.configuration['seqpro']
51+
52+
if self.project_names is None:
53+
raise ValueError("reports not yet generated")
54+
55+
job = super()._generate_prep_file(config,
56+
self.pipeline.sample_sheet.path,
57+
config['seqpro_path'],
58+
self.project_names)
59+
60+
self.prep_file_paths = job.prep_file_paths
61+
62+
def generate_touched_studies(self, qclient):
63+
results = {}
64+
65+
for study_id, pf_paths in self.prep_file_paths.items():
66+
for pf_path in pf_paths:
67+
# record the data-type as either metagenomic or
68+
# metatranscriptomic, according to what's stored in the
69+
# pipeline.
70+
results[pf_path] = self.pipeline.pipeline_type
71+
72+
super()._generate_touched_studies(qclient, results)
73+
74+
def generate_commands(self, qclient):
75+
super()._generate_commands()
76+
77+
out_dir = self.pipeline.output_path
78+
output_path = self.pipeline.output_path
79+
80+
self.cmds.append(f'cd {self.pipeline.output_path}; '
81+
'tar zcvf logs-QCJob.tgz QCJob/logs')
82+
83+
self.cmds.append(f'cd {self.pipeline.output_path}; '
84+
'tar zcvf reports-ConvertJob.tgz ConvertJob/Reports '
85+
'ConvertJob/Logs')
86+
87+
self.write_commands_to_output_path()
88+
89+
if self.sifs:
90+
# just use the filenames for tarballing the sifs.
91+
# the sifs should all be stored in the {out_dir} by default.
92+
tmp = [basename(x) for x in self.sifs]
93+
# convert sifs into a list of filenames.
94+
tmp = ' '.join(tmp)
95+
self.cmds.append(f'cd {out_dir}; tar zcvf sample-files.tgz {tmp}')
96+
97+
csv_fps = []
98+
for root, dirs, files in walk(join(output_path, 'PrepFiles')):
99+
for csv_file in files:
100+
csv_fps.append(join(root, csv_file))
101+
102+
touched_studies = []
103+
104+
for project, upload_dir, qiita_id in self.special_map:
105+
# sif filenames are of the form:
106+
blanks_file = f'{self.pipeline.run_id}_{project}_blanks.tsv'
107+
if self.sifs and [x for x in self.sifs if blanks_file in x]:
108+
# move uncompressed sifs to upload_dir.
109+
tmp = f'cd {out_dir}; mv {blanks_file} {upload_dir}'
110+
self.cmds.append(tmp)
111+
112+
# record that something is being moved into a Qiita Study.
113+
# this will allow us to notify the user which Studies to
114+
# review upon completion.
115+
touched_studies.append((qiita_id, project))
116+
117+
if self.pipeline.pipeline_type in Step.META_TYPES:
118+
self.cmds.append(f'cd {out_dir}; tar zcvf reports-QCJob.tgz '
119+
f'QCJob/{project}/fastp_reports_dir')
120+
121+
if exists(f'{out_dir}/QCJob/{project}/filtered_sequences'):
122+
self.cmds.append(f'cd {out_dir}; mv '
123+
f'QCJob/{project}/filtered_sequences/* '
124+
f'{upload_dir}')
125+
elif exists(f'{out_dir}/QCJob/{project}/trimmed_sequences'):
126+
self.cmds.append(f'cd {out_dir}; mv '
127+
f'QCJob/{project}/trimmed_sequences/* '
128+
f'{upload_dir}')
129+
elif exists(f'{out_dir}/QCJob/{project}/amplicon'):
130+
self.cmds.append(f'cd {out_dir}; mv '
131+
f'QCJob/{project}/amplicon/* '
132+
f'{upload_dir}')
133+
else:
134+
raise PipelineError("QCJob output not in expected location")
135+
136+
for csv_file in csv_fps:
137+
if project in csv_file:
138+
tmp = f'cd {out_dir}; mv {csv_file} {upload_dir}'
139+
self.cmds.append(tmp)
140+
break
141+
142+
# create a set of unique study-ids that were touched by the Pipeline
143+
# and return this information to the user.
144+
touched_studies = sorted(list(set(touched_studies)))
145+
146+
data = []
147+
for qiita_id, project in touched_studies:
148+
for prep_id in self.touched_studies_prep_info[qiita_id]:
149+
surl = f'{qclient._server_url}/study/description/{qiita_id}'
150+
prep_url = (f'{qclient._server_url}/study/description/'
151+
f'{qiita_id}?prep_id={prep_id}')
152+
data.append({'Project': project, 'Qiita Study ID': qiita_id,
153+
'Qiita Prep ID': prep_id, 'Qiita URL': surl,
154+
'Prep URL': prep_url})
155+
156+
df = pd.DataFrame(data)
157+
158+
with open(join(out_dir, 'touched_studies.html'), 'w') as f:
159+
f.write(df.to_html(border=2, index=False, justify="left",
160+
render_links=True, escape=False))
161+
162+
# copy all tgz files, including sample-files.tgz, to final_results.
163+
self.cmds.append(f'cd {out_dir}; mv *.tgz final_results')
164+
self.cmds.append(f'cd {out_dir}; mv FastQCJob/multiqc final_results')
165+
166+
if exists(join(out_dir, 'touched_studies.html')):
167+
tmp = f'cd {out_dir}; mv touched_studies.html final_results'
168+
self.cmds.append(tmp)
169+
170+
if exists(join(out_dir, 'failed_samples.html')):
171+
tmp = f'cd {out_dir}; mv failed_samples.html final_results'
172+
self.cmds.append(tmp)

0 commit comments

Comments
 (0)