diff --git a/CHANGELOG.md b/CHANGELOG.md index 96069b7913..d1e27f7698 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#1335](https://github.com/nf-core/sarek/pull/1335) - Add docs and validation for bcftools annotation parameters - [#1345](https://github.com/nf-core/sarek/pull/1345) - Preserve STDERR for easier debugging - [#1351](https://github.com/nf-core/sarek/pull/1351) - Fix params name for test profiles (`bcftools_annotations`) +- [#1357](https://github.com/nf-core/sarek/pull/1364) - Fixed bug where samples were dropped while reconstituting BAM files ### Removed diff --git a/nextflow_schema.json b/nextflow_schema.json index 3f3f21313f..a7f7ed7065 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -65,11 +65,22 @@ "default": "", "properties": { "split_fastq": { + "oneOf": [ + { + "type": "integer", + "minimum": 250 + }, + { + "type": "integer", + "minimum": 0, + "maximum": 0 + } + ], "type": "integer", "default": 50000000, "fa_icon": "fas fa-clock", "description": "Specify how many reads each split of a FastQ file contains. Set 0 to turn off splitting at all.", - "help_text": "Use the the tool FastP to split FASTQ file by number of reads. This parallelizes across fastq file shards speeding up mapping. " + "help_text": "Use the the tool FastP to split FASTQ file by number of reads. This parallelizes across fastq file shards speeding up mapping. Note although the minimum value is 250 reads, if you have fewer than 250 reads a single FASTQ shard will still be created." }, "wes": { "type": "boolean", diff --git a/workflows/sarek.nf b/workflows/sarek.nf index a18d3864b3..7a90bd6757 100644 --- a/workflows/sarek.nf +++ b/workflows/sarek.nf @@ -471,7 +471,7 @@ workflow SAREK { if (params.split_fastq) { reads_for_alignment = FASTP.out.reads.map{ meta, reads -> read_files = reads.sort(false) { a,b -> a.getName().tokenize('.')[0] <=> b.getName().tokenize('.')[0] }.collate(2) - [ meta + [ size:read_files.size() ], read_files ] + [ meta + [ n_fastq: read_files.size() ], read_files ] }.transpose() } else reads_for_alignment = FASTP.out.reads @@ -482,34 +482,61 @@ workflow SAREK { } // STEP 1: MAPPING READS TO REFERENCE GENOME - // reads will be sorted - reads_for_alignment = reads_for_alignment.map{ meta, reads -> - // Update meta.id to meta.sample no multiple lanes or splitted fastqs - if (meta.size * meta.num_lanes == 1) [ meta + [ id:meta.sample ], reads ] - else [ meta, reads ] - } + // First, we must calculate number of lanes for each sample (meta.n_fastq) + // This is needed to group reads from the same sample together using groupKey to avoid stalling the workflow + // when reads from different samples are mixed together + reads_for_alignment.map { meta, reads -> + [ meta.subMap('patient', 'sample', 'sex', 'status'), reads ] + } + .groupTuple() + .map { meta, reads -> + meta + [ n_fastq: reads.size() ] // We can drop the FASTQ files now that we know how many there are + } + .set { reads_grouping_key } + // reads will be sorted sort_bam = true FASTQ_ALIGN_BWAMEM_MEM2_DRAGMAP_SENTIEON(reads_for_alignment, index_alignement, sort_bam, fasta, fasta_fai) // Grouping the bams from the same samples not to stall the workflow - bam_mapped = FASTQ_ALIGN_BWAMEM_MEM2_DRAGMAP_SENTIEON.out.bam.map{ meta, bam -> - - // Update meta.id to be meta.sample, ditching sample-lane that is not needed anymore - // Update meta.data_type - // Remove no longer necessary fields: - // read_group: Now in the BAM header - // num_lanes: only needed for mapping - // size: only needed for mapping - - // Use groupKey to make sure that the correct group can advance as soon as it is complete - // and not stall the workflow until all reads from all channels are mapped - [ groupKey( meta - meta.subMap('num_lanes', 'read_group', 'size') + [ data_type:'bam', id:meta.sample ], (meta.num_lanes ?: 1) * (meta.size ?: 1)), bam ] - }.groupTuple() - - bai_mapped = FASTQ_ALIGN_BWAMEM_MEM2_DRAGMAP_SENTIEON.out.bai.map{ meta, bai -> - [ groupKey( meta - meta.subMap('num_lanes', 'read_group', 'size') + [ data_type:'bai', id:meta.sample ], (meta.num_lanes ?: 1) * (meta.size ?: 1)), bai ] - }.groupTuple() + // Use groupKey to make sure that the correct group can advance as soon as it is complete + // and not stall the workflow until all reads from all channels are mapped + bam_mapped = FASTQ_ALIGN_BWAMEM_MEM2_DRAGMAP_SENTIEON.out.bam + .combine(reads_grouping_key) // Creates a tuple of [ meta, bam, reads_grouping_key ] + .filter { meta1, bam, meta2 -> meta1.sample == meta2.sample } + // Add n_fastq and other variables to meta + .map { meta1, bam, meta2 -> + [ meta1 + meta2, bam ] + } + // Manipulate meta map to remove old fields and add new ones + .map { meta, bam -> + [ meta - meta.subMap('id', 'read_group', 'data_type', 'num_lanes', 'read_group', 'size') + [ data_type: 'bam', id: meta.sample ], bam ] + } + // Create groupKey from meta map + .map { meta, bam -> + [ groupKey( meta, meta.n_fastq), bam ] + } + // Group + .groupTuple() + + bai_mapped = FASTQ_ALIGN_BWAMEM_MEM2_DRAGMAP_SENTIEON.out.bai + .combine(reads_grouping_key) // Creates a tuple of [ meta, bai, reads_grouping_key ] + .filter { meta1, bai, meta2 -> meta1.sample == meta2.sample } + // Add n_fastq and other variables to meta + .map { meta1, bai, meta2 -> + [ meta1 + meta2, bai ] + } + // Manipulate meta map to remove old fields and add new ones + .map { meta, bai -> + [ meta - meta.subMap('id', 'read_group', 'data_type', 'num_lanes', 'read_group', 'size') + [ data_type: 'bai', id: meta.sample ], bai ] + } + // Create groupKey from meta map + .map { meta, bai -> + [ groupKey( meta, meta.n_fastq), bai ] + } + // Group + .groupTuple() + // gatk4 markduplicates can handle multiple bams as input, so no need to merge/index here // Except if and only if save_mapped or (skipping markduplicates and sentieon-dedup)