-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.nf
executable file
·62 lines (42 loc) · 2 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
// include non-process modules
include { help_message; version_message; complete_message; error_message; pipeline_start_message } from './modules/messages.nf'
include { default_params; check_params } from './modules/params_parser.nf'
include { help_or_version } from './modules/params_utilities.nf'
version = '1.0dev'
// setup default params
default_params = default_params()
// merge defaults with user params
merged_params = default_params + params
// help and version messages
help_or_version(merged_params, version)
final_params = check_params(merged_params)
// starting pipeline
pipeline_start_message(version, final_params)
// include processes
include { MINIMAP2_INDEX; MINIMAP2_SAM; SAM_SORT_AND_INDEX; EXTRACT_MICROBIAL_READS; BAM_STATISTICS; COMBINE_BAM_STATISTICS } from './modules/processes.nf' addParams(final_params)
workflow {
fasta_ch = channel
.fromPath( final_params.reference_fasta, checkIfExists: true )
.map { file -> tuple(file.baseName, file) }
reads_ch = channel
.fromPath( final_params.reads, checkIfExists: true )
.map { file -> tuple(file.simpleName, file) }
MINIMAP2_INDEX(fasta_ch)
ch_index = MINIMAP2_INDEX.out.index_ch
// combine method used instead of join method as both channels (reads_ch and ch_index) share no key
combine_ch = reads_ch.combine(ch_index)
MINIMAP2_SAM ( combine_ch )
SAM_SORT_AND_INDEX(MINIMAP2_SAM.out.sam_ch)
EXTRACT_MICROBIAL_READS(SAM_SORT_AND_INDEX.out.bam_ch)
BAM_STATISTICS(SAM_SORT_AND_INDEX.out.bam_ch)
collected_bam_statistics_ch = BAM_STATISTICS.out.bam_stats_ch.collect( sort: {a, b -> a[0].getBaseName() <=> b[0].getBaseName()} )
COMBINE_BAM_STATISTICS(collected_bam_statistics_ch)
}
workflow.onComplete {
complete_message(final_params, workflow, version)
}
workflow.onError {
error_message(workflow)
}