7
7
# -----------------------------------------------------------------------------
8
8
9
9
from os .path import basename , join , isdir , isfile , exists
10
+ from shutil import copyfile
10
11
from os import makedirs , remove , listdir
11
12
from datetime import date , timedelta
12
13
from urllib import quote
26
27
from qiita_db .ontology import Ontology
27
28
from qiita_db .util import convert_to_id , get_mountpoint , open_file
28
29
from qiita_db .artifact import Artifact
30
+ from qiita_db .metadata_template .constants import (
31
+ TARGET_GENE_DATA_TYPES , PREP_TEMPLATE_COLUMNS_TARGET_GENE )
32
+ from qiita_db .processing_job import _system_call as system_call
29
33
30
34
31
35
def clean_whitespace (text ):
@@ -171,10 +175,15 @@ def __init__(self, artifact_id, action):
171
175
self .publications = self .study .publications
172
176
173
177
# getting the restrictions
174
- st_missing = self .sample_template .check_restrictions (
175
- [self .sample_template .columns_restrictions ['EBI' ]])
176
- pt_missing = self .prep_template .check_restrictions (
177
- [self .prep_template .columns_restrictions ['EBI' ]])
178
+ st_restrictions = [self .sample_template .columns_restrictions ['EBI' ]]
179
+ pt_restrictions = [self .prep_template .columns_restrictions ['EBI' ]]
180
+ if self .artifact .data_type in TARGET_GENE_DATA_TYPES :
181
+ # adding restictions on primer and barcode as these are
182
+ # conditionally requiered for target gene
183
+ pt_restrictions .append (
184
+ PREP_TEMPLATE_COLUMNS_TARGET_GENE ['demultiplex' ])
185
+ st_missing = self .sample_template .check_restrictions (st_restrictions )
186
+ pt_missing = self .prep_template .check_restrictions (pt_restrictions )
178
187
# testing if there are any missing columns
179
188
if st_missing :
180
189
error_msgs .append ("Missing column in the sample template: %s" %
@@ -907,6 +916,80 @@ def data_retriever(key, trans_dict):
907
916
return (study_accession , sample_accessions , biosample_accessions ,
908
917
experiment_accessions , run_accessions )
909
918
919
+ def _generate_demultiplexed_fastq_per_sample_FASTQ (self ):
920
+ """Modularity helper"""
921
+ ar = self .artifact
922
+ fps = [(basename (fp ), fp ) for _ , fp , fpt in ar .filepaths
923
+ if fpt == 'raw_forward_seqs' ]
924
+ fps .sort (key = lambda x : x [1 ])
925
+ if 'run_prefix' in self .prep_template .categories ():
926
+ rps = [(k , v ) for k , v in viewitems (
927
+ self .prep_template .get_category ('run_prefix' ))]
928
+ else :
929
+ rps = [(v , v .split ('.' , 1 )[1 ]) for v in self .prep_template .keys ()]
930
+ rps .sort (key = lambda x : x [1 ])
931
+ demux_samples = set ()
932
+ for sn , rp in rps :
933
+ for i , (bn , fp ) in enumerate (fps ):
934
+ if bn .startswith (rp ):
935
+ demux_samples .add (sn )
936
+ new_fp = self .sample_demux_fps [sn ]
937
+ if fp .endswith ('.gz' ):
938
+ copyfile (fp , new_fp )
939
+ else :
940
+ cmd = "gzip -c %s > %s" % (fp , new_fp )
941
+ stdout , stderr , rv = system_call (cmd )
942
+ if rv != 0 :
943
+ error_msg = (
944
+ "Error:\n Std output:%s\n Std error:%s"
945
+ % (stdout , stderr ))
946
+ raise EBISubmissionError (error_msg )
947
+ del fps [i ]
948
+ break
949
+ if fps :
950
+ error_msg = (
951
+ 'Discrepancy between filepaths and sample names. Extra'
952
+ ' filepaths: %s' % ', ' .join ([fp [0 ] for fp in fps ]))
953
+ LogEntry .create ('Runtime' , error_msg )
954
+ raise EBISubmissionError (error_msg )
955
+
956
+ return demux_samples , \
957
+ set (self .samples .keys ()).difference (set (demux_samples ))
958
+
959
+ def _generate_demultiplexed_fastq_demux (self , mtime ):
960
+ """Modularity helper"""
961
+ # An artifact will hold only one file of type
962
+ # `preprocessed_demux`. Thus, we only use the first one
963
+ # (the only one present)
964
+ ar = self .artifact
965
+ demux = [path for _ , path , ftype in ar .filepaths
966
+ if ftype == 'preprocessed_demux' ][0 ]
967
+
968
+ demux_samples = set ()
969
+ with open_file (demux ) as demux_fh :
970
+ if not isinstance (demux_fh , File ):
971
+ error_msg = (
972
+ "'%s' doesn't look like a demux file" % demux )
973
+ LogEntry .create ('Runtime' , error_msg )
974
+ raise EBISubmissionError (error_msg )
975
+ for s , i in to_per_sample_ascii (demux_fh ,
976
+ self .prep_template .keys ()):
977
+ sample_fp = self .sample_demux_fps [s ]
978
+ wrote_sequences = False
979
+ with GzipFile (sample_fp , mode = 'w' , mtime = mtime ) as fh :
980
+ for record in i :
981
+ fh .write (record )
982
+ wrote_sequences = True
983
+
984
+ if wrote_sequences :
985
+ demux_samples .add (s )
986
+ else :
987
+ del (self .samples [s ])
988
+ del (self .samples_prep [s ])
989
+ del (self .sample_demux_fps [s ])
990
+ remove (sample_fp )
991
+ return demux_samples
992
+
910
993
def generate_demultiplexed_fastq (self , rewrite_fastq = False , mtime = None ):
911
994
"""Generates demultiplexed fastq
912
995
@@ -942,39 +1025,16 @@ def generate_demultiplexed_fastq(self, rewrite_fastq=False, mtime=None):
942
1025
- The demux file couldn't be read
943
1026
- All samples are removed
944
1027
"""
945
- ar = self .artifact
946
-
947
1028
dir_not_exists = not isdir (self .full_ebi_dir )
1029
+ missing_samples = []
948
1030
if dir_not_exists or rewrite_fastq :
949
1031
makedirs (self .full_ebi_dir )
950
1032
951
- # An artifact will hold only one file of type `preprocessed_demux`
952
- # Thus, we only use the first one (the only one present)
953
- demux = [path for _ , path , ftype in ar .filepaths
954
- if ftype == 'preprocessed_demux' ][0 ]
955
-
956
- demux_samples = set ()
957
- with open_file (demux ) as demux_fh :
958
- if not isinstance (demux_fh , File ):
959
- error_msg = "'%s' doesn't look like a demux file" % demux
960
- LogEntry .create ('Runtime' , error_msg )
961
- raise EBISubmissionError (error_msg )
962
- for s , i in to_per_sample_ascii (demux_fh ,
963
- self .prep_template .keys ()):
964
- sample_fp = self .sample_demux_fps [s ]
965
- wrote_sequences = False
966
- with GzipFile (sample_fp , mode = 'w' , mtime = mtime ) as fh :
967
- for record in i :
968
- fh .write (record )
969
- wrote_sequences = True
970
-
971
- if wrote_sequences :
972
- demux_samples .add (s )
973
- else :
974
- del (self .samples [s ])
975
- del (self .samples_prep [s ])
976
- del (self .sample_demux_fps [s ])
977
- remove (sample_fp )
1033
+ if self .artifact .artifact_type == 'per_sample_FASTQ' :
1034
+ demux_samples , missing_samples = \
1035
+ self ._generate_demultiplexed_fastq_per_sample_FASTQ ()
1036
+ else :
1037
+ demux_samples = self ._generate_demultiplexed_fastq_demux (mtime )
978
1038
else :
979
1039
demux_samples = set ()
980
1040
extension = '.fastq.gz'
@@ -984,8 +1044,10 @@ def generate_demultiplexed_fastq(self, rewrite_fastq=False, mtime=None):
984
1044
if isfile (fpath ) and f .endswith (extension ):
985
1045
demux_samples .add (f [:- extension_len ])
986
1046
987
- missing_samples = set (self .samples .keys ()).difference (
988
- set (demux_samples ))
1047
+ missing_samples = set (
1048
+ self .samples .keys ()).difference (demux_samples )
1049
+
1050
+ if missing_samples :
989
1051
for ms in missing_samples :
990
1052
del (self .samples [ms ])
991
1053
del (self .samples_prep [ms ])
@@ -997,4 +1059,5 @@ def generate_demultiplexed_fastq(self, rewrite_fastq=False, mtime=None):
997
1059
"do not match." )
998
1060
LogEntry .create ('Runtime' , error_msg )
999
1061
raise EBISubmissionError (error_msg )
1062
+
1000
1063
return demux_samples
0 commit comments