@@ -538,14 +538,14 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
538
538
val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
539
539
// val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
540
540
assert(initialMapStatus1.count(_ != null ) === 3 )
541
- assert(initialMapStatus1.map{_.location.executorId}.toSet ===
541
+ assert(initialMapStatus1.map{_.location.asInstanceOf [ BlockManagerId ]. executorId}.toSet ===
542
542
Set (" hostA-exec1" , " hostA-exec2" , " hostB-exec" ))
543
543
assert(initialMapStatus1.map{_.mapId}.toSet === Set (5 , 6 , 7 ))
544
544
545
545
val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
546
546
// val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
547
547
assert(initialMapStatus2.count(_ != null ) === 3 )
548
- assert(initialMapStatus2.map{_.location.executorId}.toSet ===
548
+ assert(initialMapStatus2.map{_.location.asInstanceOf [ BlockManagerId ]. executorId}.toSet ===
549
549
Set (" hostA-exec1" , " hostA-exec2" , " hostB-exec" ))
550
550
assert(initialMapStatus2.map{_.mapId}.toSet === Set (8 , 9 , 10 ))
551
551
@@ -561,13 +561,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
561
561
562
562
val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
563
563
assert(mapStatus1.count(_ != null ) === 1 )
564
- assert(mapStatus1(2 ).location.executorId === " hostB-exec" )
565
- assert(mapStatus1(2 ).location.host === " hostB" )
564
+ assert(mapStatus1(2 ).location.asInstanceOf [ BlockManagerId ]. executorId === " hostB-exec" )
565
+ assert(mapStatus1(2 ).location.asInstanceOf [ BlockManagerId ]. host === " hostB" )
566
566
567
567
val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
568
568
assert(mapStatus2.count(_ != null ) === 1 )
569
- assert(mapStatus2(2 ).location.executorId === " hostB-exec" )
570
- assert(mapStatus2(2 ).location.host === " hostB" )
569
+ assert(mapStatus2(2 ).location.asInstanceOf [ BlockManagerId ]. executorId === " hostB-exec" )
570
+ assert(mapStatus2(2 ).location.asInstanceOf [ BlockManagerId ]. host === " hostB" )
571
571
}
572
572
573
573
test(" SPARK-32003: All shuffle files for executor should be cleaned up on fetch failure" ) {
@@ -591,8 +591,10 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
591
591
// The MapOutputTracker has all the shuffle files
592
592
val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
593
593
assert(mapStatuses.count(_ != null ) === 3 )
594
- assert(mapStatuses.count(s => s != null && s.location.executorId == " hostA-exec" ) === 2 )
595
- assert(mapStatuses.count(s => s != null && s.location.executorId == " hostB-exec" ) === 1 )
594
+ assert(mapStatuses.count(s => s != null &&
595
+ s.location.asInstanceOf [BlockManagerId ].executorId == " hostA-exec" ) === 2 )
596
+ assert(mapStatuses.count(s => s != null &&
597
+ s.location.asInstanceOf [BlockManagerId ].executorId == " hostB-exec" ) === 1 )
596
598
597
599
// Now a fetch failure from the lost executor occurs
598
600
complete(taskSets(1 ), Seq (
@@ -605,8 +607,10 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
605
607
606
608
// Shuffle files for hostA-exec should be lost
607
609
assert(mapStatuses.count(_ != null ) === 1 )
608
- assert(mapStatuses.count(s => s != null && s.location.executorId == " hostA-exec" ) === 0 )
609
- assert(mapStatuses.count(s => s != null && s.location.executorId == " hostB-exec" ) === 1 )
610
+ assert(mapStatuses.count(s => s != null &&
611
+ s.location.asInstanceOf [BlockManagerId ].executorId == " hostA-exec" ) === 0 )
612
+ assert(mapStatuses.count(s => s != null &&
613
+ s.location.asInstanceOf [BlockManagerId ].executorId == " hostB-exec" ) === 1 )
610
614
611
615
// Additional fetch failure from the executor does not result in further call to
612
616
// mapOutputTracker.removeOutputsOnExecutor
@@ -843,7 +847,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
843
847
// have the 2nd attempt pass
844
848
complete(taskSets(2 ), Seq ((Success , makeMapStatus(" hostA" , reduceRdd.partitions.length))))
845
849
// we can see both result blocks now
846
- assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0 ).map(_._1.host).toSet ===
850
+ assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0 )
851
+ .map(_._1.asInstanceOf [BlockManagerId ].host).toSet ===
847
852
HashSet (" hostA" , " hostB" ))
848
853
completeAndCheckAnswer(taskSets(3 ), Seq ((Success , 43 )), Map (0 -> 42 , 1 -> 43 ))
849
854
assertDataStructuresEmpty()
@@ -1228,7 +1233,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
1228
1233
submit(reduceRdd, Array (0 , 1 ))
1229
1234
completeShuffleMapStageSuccessfully(0 , 0 , reduceRdd.partitions.length)
1230
1235
// The MapOutputTracker should know about both map output locations.
1231
- assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0 ).map(_._1.host).toSet ===
1236
+ assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0 )
1237
+ .map(_._1.asInstanceOf [BlockManagerId ].host).toSet ===
1232
1238
HashSet (" hostA" , " hostB" ))
1233
1239
1234
1240
// The first result task fails, with a fetch failure for the output from the first mapper.
@@ -1349,9 +1355,11 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
1349
1355
1350
1356
completeShuffleMapStageSuccessfully(0 , 0 , 2 )
1351
1357
// The MapOutputTracker should know about both map output locations.
1352
- assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0 ).map(_._1.host).toSet ===
1358
+ assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0 )
1359
+ .map(_._1.asInstanceOf [BlockManagerId ].host).toSet ===
1353
1360
HashSet (" hostA" , " hostB" ))
1354
- assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 1 ).map(_._1.host).toSet ===
1361
+ assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 1 )
1362
+ .map(_._1.asInstanceOf [BlockManagerId ].host).toSet ===
1355
1363
HashSet (" hostA" , " hostB" ))
1356
1364
1357
1365
// The first result task fails, with a fetch failure for the output from the first mapper.
0 commit comments