Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/main/java/net/preibisch/bigstitcher/spark/SparkFusion.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,11 @@ else if (
viewIdsGlobal = Import.getViewIds( dataGlobal );
}

final int[] blocksPerJob = Import.csvStringToIntArray(blockScaleString);
final int[] blocksPerJob = useSharding ? null : Import.csvStringToIntArray(blockScaleString);
System.out.println( "Fusing: " + boundingBox.getTitle() + ": " + Util.printInterval( boundingBox ) +
" with blocksize " + Util.printCoordinates( blockSize ) + " and " + Util.printCoordinates( blocksPerJob ) + " blocks per job/shard" );
" with blocksize " + Util.printCoordinates( blockSize ) + ( useSharding
? " and shard size " + Util.printCoordinates( shardSize )
: " and " + Util.printCoordinates( blocksPerJob ) + " blocks per job" ) );

if ( dataType == DataType.UINT8 )
System.out.println( "Fusing to UINT8, min intensity = " + minIntensity + ", max intensity = " + maxIntensity );
Expand Down Expand Up @@ -809,7 +811,7 @@ else if ( dataType == DataType.UINT16 )
{
final int s = level;

final List<long[][]> allBlocks = Grid.create(
List<long[][]> allBlocks = Grid.create(
new long[] { mrInfo[ level ].dimensions[ 0 ], mrInfo[ level ].dimensions[ 1 ], mrInfo[ level ].dimensions[ 2 ] },
computeBlockSize,
blockSize);
Expand Down Expand Up @@ -868,7 +870,7 @@ else if ( dataType == DataType.UINT16 )

// extract all blocks that failed
final Set<long[][]> failedBlocksSet =
retryTrackerDS.processWithSpark( rddDSResult, grid );
retryTrackerDS.processWithSpark( rddDSResult, allBlocks );

// Use RetryTracker to handle retry counting and removal
if (!retryTrackerDS.processFailures(failedBlocksSet))
Expand All @@ -877,11 +879,11 @@ else if ( dataType == DataType.UINT16 )
System.exit( 1 );
}

// Update grid for next iteration with remaining failed blocks
grid.clear();
grid.addAll(failedBlocksSet);
// Update allBlocks for next iteration with remaining failed blocks
allBlocks.clear();
allBlocks.addAll(failedBlocksSet);
}
while ( grid.size() > 0 );
while ( allBlocks.size() > 0 );

System.out.println( new Date( System.currentTimeMillis() ) + ": Saved level s " + level + ", took: " + (System.currentTimeMillis() - time ) + " ms." );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -63,6 +64,7 @@
import net.imglib2.img.array.ArrayImgs;
import net.imglib2.interpolation.randomaccess.NLinearInterpolatorFactory;
import net.imglib2.neighborsearch.NearestNeighborSearchOnKDTree;
import net.imglib2.neighborsearch.RadiusNeighborSearchOnKDTree;
import net.imglib2.position.FunctionRandomAccessible;
import net.imglib2.realtransform.AffineTransform3D;
import net.imglib2.type.numeric.RealType;
Expand Down Expand Up @@ -822,41 +824,63 @@ public Void call() throws Exception
else
intensitiesList = null;

// combine points since overlapping areas might exist
// combine points since overlapping areas might exist;
// collect all candidates first so we can build the KDTree only once per view
final ArrayList< InterestPoint > candidates = new ArrayList<>();
final ArrayList< Double > candidateIntensities = ( storeIntensities || maxSpots > 0 ) ? new ArrayList<>() : null;

for ( int l = 0; l < ipsList.size(); ++l )
{
final List< InterestPoint > ips = ipsList.get( l );
final List< Double > intensities;
candidates.addAll( ipsList.get( l ) );

if ( storeIntensities || maxSpots > 0 )
intensities = intensitiesList.get( l );
else
intensities = null;
candidateIntensities.addAll( intensitiesList.get( l ) );
}

if ( !overlappingOnly || myIps.size() == 0 )
{
myIps.addAll( ips );
if ( !overlappingOnly || ipsList.size() <= 1 )
{
// no inter-block deduplication needed
myIps.addAll( candidates );

if ( storeIntensities || maxSpots > 0 )
myIntensities.addAll( intensities );
}
else
if ( storeIntensities || maxSpots > 0 )
myIntensities.addAll( candidateIntensities );
}
else if ( !candidates.isEmpty() )
{
// build KDTree ONCE over all candidates and do a single greedy dedup pass;
// earlier-block points always win: for each accepted point, mark all
// later-indexed points within combineDistance as duplicates
final KDTree< InterestPoint > tree = new KDTree<>( candidates, candidates );
final RadiusNeighborSearchOnKDTree< InterestPoint > search = new RadiusNeighborSearchOnKDTree<>( tree );

// map object identity → index in candidates for O(1) lookup after radius search
final IdentityHashMap< InterestPoint, Integer > indexMap = new IdentityHashMap<>( candidates.size() );
for ( int i = 0; i < candidates.size(); ++i )
indexMap.put( candidates.get( i ), i );

final boolean[] isDuplicate = new boolean[ candidates.size() ]; // default false

for ( int i = 0; i < candidates.size(); ++i )
{
final KDTree< InterestPoint > tree = new KDTree<>(myIps, myIps);
final NearestNeighborSearchOnKDTree< InterestPoint > search = new NearestNeighborSearchOnKDTree<>( tree );
if ( isDuplicate[ i ] ) continue;

for ( int i = 0; i < ips.size(); ++i )
search.search( candidates.get( i ), combineDistance, false );
for ( int k = 0; k < search.numNeighbors(); ++k )
{
final InterestPoint ip = ips.get( i );
search.search( ip );
final Integer j = indexMap.get( search.getSampler( k ).get() );
if ( j != null && j > i )
isDuplicate[ j ] = true;
}
}

if ( search.getDistance() > combineDistance )
{
myIps.add( ip );
for ( int i = 0; i < candidates.size(); ++i )
{
if ( !isDuplicate[ i ] )
{
myIps.add( candidates.get( i ) );

if ( storeIntensities || maxSpots > 0 )
myIntensities.add( intensities.get( i ) );
}
if ( storeIntensities || maxSpots > 0 )
myIntensities.add( candidateIntensities.get( i ) );
}
}
}
Expand Down Expand Up @@ -905,9 +929,13 @@ public Void call() throws Exception
final String params = "DOG (Spark) s=" + sigma + " t=" + threshold + " overlappingOnly=" + overlappingOnly + " min=" + findMin + " max=" + findMax +
" downsampleXY=" + downsampleXY + " downsampleZ=" + downsampleZ + " minIntensity=" + minIntensity + " maxIntensity=" + maxIntensity;

long timeAddIP = System.currentTimeMillis();
InterestPointTools.addInterestPoints( dataGlobal, label, interestPoints, params );
System.out.println( "addInterestPoints took " + ( System.currentTimeMillis() - timeAddIP ) + " ms." );

long timeSaveXML = System.currentTimeMillis();
new XmlIoSpimData2().save( dataGlobal, xmlURI );
System.out.println( "Saving XML took " + ( System.currentTimeMillis() - timeSaveXML ) + " ms." );

// store image intensities for interest points
if( storeIntensities )
Expand All @@ -916,6 +944,7 @@ public Void call() throws Exception
{
try
{
long timeIntensities = System.currentTimeMillis();
System.out.println( "Retrieving intensities for interest points '" + label + "' for " + Group.pvid(viewId) + " ... " );

final InterestPointsN5 i = (InterestPointsN5)dataGlobal.getViewInterestPoints().getViewInterestPointLists( viewId ).getInterestPointList( label );
Expand Down Expand Up @@ -952,8 +981,8 @@ public Void call() throws Exception
N5Utils.save( intensityData, n5Writer, datasetIntensities, new int[] { 1, InterestPointsN5.defaultBlockSize }, new ZstandardCompression() );
}

System.out.println( "Saved: " + tempURI + "/" + datasetIntensities );
System.out.println( "Saved: " + tempURI + "/" + datasetIntensities + " (took " + ( System.currentTimeMillis() - timeIntensities ) + " ms)" );

}
catch ( Exception e )
{
Expand Down