Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlowFeatureMapper speed improvements #8982

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ enum MappingFeatureEnum {
@Argument(fullName = "keep-supplementary-alignments", doc = "keep supplementary alignments ?", optional = true)
public boolean keepSupplementaryAlignments = false;

/**
* do not emit edit distance at all
**/
@Argument(fullName = "no-edit-distance", doc = "do not emit edit distance at all ?", optional = true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

public boolean noEditDistance = false;

/**
dror27 marked this conversation as resolved.
Show resolved Hide resolved
* edit distance should come from computed NM
**/
@Argument(fullName = "nm-edit-distance", doc = "edit distance should come from computed NM ?", optional = true)
public boolean nmEditDistance = false;

/**
* debug negatives?
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public void forEachOnRead(GATKRead read, ReferenceContext referenceContext, Cons
} else {
basesString = new String(Arrays.copyOfRange(bases, startSoftClip, bases.length - endSoftClip));
}
int refEditDistance = levDistance.apply(basesString, new String(ref));

// count bases delta on M cigar elements
int nonIdentMBases = 0;
Expand Down Expand Up @@ -140,11 +139,11 @@ public void forEachOnRead(GATKRead read, ReferenceContext referenceContext, Cons
}

// add this feature
FlowFeatureMapper.MappedFeature feature = FlowFeatureMapper.MappedFeature.makeSNV(read, readOfs, ref[refOfs], referenceContext.getStart() + refOfs, readOfs - refOfs);
FlowFeatureMapper.MappedFeature feature = FlowFeatureMapper.MappedFeature.makeSNV(read, readOfs, ref[refOfs], referenceContext.getStart() + refOfs, readOfs - refOfs, referenceContext);
if ( (fmArgs.reportAllAlts || fmArgs.tagBasesWithAdjacentRefDiff) && !surrounded )
feature.adjacentRefDiff = true;
feature.nonIdentMBasesOnRead = nonIdentMBases;
feature.refEditDistance = refEditDistance;
feature.refEditDistance = () -> levDistance.apply(basesString, new String(ref));
if ( !read.isReverseStrand() )
feature.index = readOfs;
else
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package org.broadinstitute.hellbender.tools.walkers.featuremapping;

import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.hellbender.engine.FeatureContext;
import org.broadinstitute.hellbender.engine.ReadWalker;
import org.broadinstitute.hellbender.engine.ReferenceContext;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.read.GATKRead;

import java.util.concurrent.LinkedBlockingQueue;

public abstract class ThreadedReadWalker extends ReadWalker implements Runnable {

public static final int CAPACITY = 5000;

// apply() message and queue - carrying read/referenceContext tuples
static class ApplyMessage {
GATKRead read;
ReferenceContext referenceContext;
FeatureContext featureContext;
}

private LinkedBlockingQueue<ApplyMessage> applyQueue;
private Thread worker;

/**
* turn threaded walker on
**/
@Argument(fullName = "threaded-walker", doc = "turn threaded walker on?", optional = true)
public boolean threadedWalker = false;


@Override
public void onTraversalStart() {

// if running in threaded mode, start worker thread
if ( threadedWalker ) {
applyQueue = new LinkedBlockingQueue<>(CAPACITY);
worker = new Thread(this);
worker.setUncaughtExceptionHandler(getUncaughtExceptionHandler());
worker.start();
}
}

@Override
public void closeTool() {

// if running in threaded mode, signal end of input and wait for thread to complete
if ( threadedWalker ) {
try {
applyQueue.put(new ApplyMessage());
worker.join();
} catch (InterruptedException e) {
throw new GATKException("", e );
}
}
}

@Override
final public void apply(final GATKRead read, final ReferenceContext referenceContext, final FeatureContext featureContext) {

if ( !acceptRead(read, referenceContext, featureContext) ) {
return;
}

if ( threadedWalker ) {
// redirect message into queue
ApplyMessage message = new ApplyMessage();
message.read = read;
message.referenceContext = referenceContext;
message.featureContext = featureContext;
try {
applyQueue.put(message);
} catch (InterruptedException e) {
throw new GATKException("", e);
}
} else {
applyPossiblyThreaded(read, referenceContext, featureContext);
}
}

@Override
public void run() {

// loop on messages
ApplyMessage message;
while ( true ) {
// take next message off the queue
try {
message = applyQueue.take();
} catch (InterruptedException e) {
throw new GATKException("", e);
}

// end of stream?
if ( message.read == null ) {
break;
}

// process message
applyPossiblyThreaded(message.read, message.referenceContext, message.featureContext);
}
}

public abstract boolean acceptRead(GATKRead read, ReferenceContext referenceContext, FeatureContext featureContext);
public abstract void applyPossiblyThreaded(final GATKRead read, final ReferenceContext referenceContext, final FeatureContext featureContext);

public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
Thread.UncaughtExceptionHandler handler = (th, ex) -> {
System.out.println("Uncaught exception: " + ex);
ex.printStackTrace(System.out);
System.exit(-1);
};
return handler;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine that these changes are going to raise some questions. Can you please add description in the PR?

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public final class CachingIndexedFastaSequenceFile implements ReferenceSequenceF
/** The default cache size in bp */
public static final long DEFAULT_CACHE_SIZE = 1000000;

/** Requested cache size in bp - will be respected if set */
public static long requestedCacheSize = 0;

/** The cache size of this CachingIndexedFastaSequenceFile */
private final long cacheSize;

Expand Down Expand Up @@ -236,9 +239,14 @@ public long getCacheMisses() {
* @return the size of the cache we are using
*/
public long getCacheSize() {
return cacheSize;
return requestedCacheSize != 0 ? requestedCacheSize : cacheSize;
}

private long getCacheMissBackup() {
return Math.max(getCacheSize() / 1000, 1);
}


/**
* Is this CachingIndexedFastaReader keeping the original case of bases in the fasta, or is
* everything being made upper case?
Expand Down Expand Up @@ -315,10 +323,10 @@ public ReferenceSequence getSequence(String contig) {
* all of the bases in the ReferenceSequence returned by this method will be upper cased.
*/
@Override
public ReferenceSequence getSubsequenceAt( final String contig, long start, final long stop ) {
public synchronized ReferenceSequence getSubsequenceAt( final String contig, long start, final long stop ) {
final ReferenceSequence result;

if ( (stop - start + 1) > cacheSize ) {
if ( (stop - start + 1) > getCacheSize() ) {
cacheMisses++;
result = sequenceFile.getSubsequenceAt(contig, start, stop);
if ( ! preserveCase ) StringUtil.toUpperCase(result.getBases());
Expand All @@ -335,8 +343,8 @@ public ReferenceSequence getSubsequenceAt( final String contig, long start, fina

if ( start < cache.start || stop > cache.stop || cache.seq == null || cache.seq.getContigIndex() != contigInfo.getSequenceIndex() ) {
cacheMisses++;
cache.start = Math.max(start - cacheMissBackup, 0);
cache.stop = Math.min(start + cacheSize + cacheMissBackup, contigInfo.getSequenceLength());
cache.start = Math.max(start - getCacheMissBackup(), 0);
cache.stop = Math.min(start + getCacheSize() + getCacheMissBackup(), contigInfo.getSequenceLength());
cache.seq = sequenceFile.getSubsequenceAt(contig, cache.start, cache.stop);

// convert all of the bases in the sequence to upper case if we aren't preserving cases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,139 @@ public void testTagBasesWithAdjacentRefDiff() throws IOException {
}
}

@Test
public void testNmEditDistance() throws IOException {

final File outputDir = createTempDir("testFlowFeatureMapperTest");
final File expectedFile = new File(testDir + "/snv_feature_mapper_nm_edit_disance_output.vcf");
final File outputFile = UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ? expectedFile : new File(outputDir + "/snv_feature_mapper_nm_edit_disance_output.vcf");

final String[] args = new String[] {
"-R", largeFileTestDir + "/Homo_sapiens_assembly38.fasta.gz",
"-O", outputFile.getAbsolutePath(),
"-I", testDir + "/snv_feature_mapper_input.bam",
"--copy-attr", "RG",
"--copy-attr", "AS,Integer,AS attribute, as copied",
"--copy-attr", "rq,Float",
"--limit-score", "100",
"--min-score", "0",
"--snv-identical-bases", "10",
"--debug-negatives", "false",
"--debug-read-name", "150451-BC94-0645901755",
"--nm-edit-distance"
};

// run the tool
runCommandLine(args); // no assert, just make sure we don't throw

// make sure we've generated the otuput file
Assert.assertTrue(outputFile.exists());

// walk the output and expected files, compare non-comment lines
if ( !UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ) {
IntegrationTestSpec.assertEqualTextFiles(outputFile, expectedFile, "#");
}
}

@Test
public void testNoEditDistance() throws IOException {

final File outputDir = createTempDir("testFlowFeatureMapperTest");
final File expectedFile = new File(testDir + "/snv_feature_mapper_no_edit_disance_output.vcf");
final File outputFile = UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ? expectedFile : new File(outputDir + "/snv_feature_mapper_no_edit_disance_output.vcf");

final String[] args = new String[] {
"-R", largeFileTestDir + "/Homo_sapiens_assembly38.fasta.gz",
"-O", outputFile.getAbsolutePath(),
"-I", testDir + "/snv_feature_mapper_input.bam",
"--copy-attr", "RG",
"--copy-attr", "AS,Integer,AS attribute, as copied",
"--copy-attr", "rq,Float",
"--limit-score", "100",
"--min-score", "0",
"--snv-identical-bases", "10",
"--debug-negatives", "false",
"--debug-read-name", "150451-BC94-0645901755",
"--no-edit-distance"
};

// run the tool
runCommandLine(args); // no assert, just make sure we don't throw

// make sure we've generated the otuput file
Assert.assertTrue(outputFile.exists());

// walk the output and expected files, compare non-comment lines
if ( !UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ) {
IntegrationTestSpec.assertEqualTextFiles(outputFile, expectedFile, "#");
}
}

@Test
public void testMapperThread() throws IOException {

final File outputDir = createTempDir("testFlowFeatureMapperTest");
final File expectedFile = new File(testDir + "/snv_feature_mapper_output.vcf");
final File outputFile = UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ? expectedFile : new File(outputDir + "/snv_feature_mapper_output.vcf");

final String[] args = new String[] {
"-R", largeFileTestDir + "/Homo_sapiens_assembly38.fasta.gz",
"-O", outputFile.getAbsolutePath(),
"-I", testDir + "/snv_feature_mapper_input.cram",
"--copy-attr", "RG",
"--copy-attr", "AS,Integer,AS attribute, as copied",
"--copy-attr", "rq,Float",
"--limit-score", "100",
"--min-score", "0",
"--snv-identical-bases", "10",
"--debug-negatives", "false",
"--debug-read-name", "150451-BC94-0645901755",
"--threaded-walker"
};

// run the tool
runCommandLine(args); // no assert, just make sure we don't throw

// make sure we've generated the otuput file
Assert.assertTrue(outputFile.exists());

// walk the output and expected files, compare non-comment lines
if ( !UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ) {
IntegrationTestSpec.assertEqualTextFiles(outputFile, expectedFile, "#");
}
}

@Test
public void testMapperWriterThreaded() throws IOException {

final File outputDir = createTempDir("testFlowFeatureMapperTest");
final File expectedFile = new File(testDir + "/snv_feature_mapper_output.vcf");
final File outputFile = UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ? expectedFile : new File(outputDir + "/snv_feature_mapper_output.vcf");

final String[] args = new String[] {
"-R", largeFileTestDir + "/Homo_sapiens_assembly38.fasta.gz",
"-O", outputFile.getAbsolutePath(),
"-I", testDir + "/snv_feature_mapper_input.cram",
"--copy-attr", "RG",
"--copy-attr", "AS,Integer,AS attribute, as copied",
"--copy-attr", "rq,Float",
"--limit-score", "100",
"--min-score", "0",
"--snv-identical-bases", "10",
"--debug-negatives", "false",
"--debug-read-name", "150451-BC94-0645901755",
"--threaded-writer"
};

// run the tool
runCommandLine(args); // no assert, just make sure we don't throw

// make sure we've generated the otuput file
Assert.assertTrue(outputFile.exists());

// walk the output and expected files, compare non-comment lines
if ( !UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ) {
IntegrationTestSpec.assertEqualTextFiles(outputFile, expectedFile, "#");
}
}
}
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown