@@ -558,6 +558,7 @@ class DAGSchedulerSuite
558
558
/** This tests the case where another FetchFailed comes in while the map stage is getting
559
559
* re-run. */
560
560
test(" late fetch failures don't cause multiple concurrent attempts for the same map stage" ) {
561
+ println(" begin late fetch failure" )
561
562
val shuffleMapRdd = new MyRDD (sc, 2 , Nil )
562
563
val shuffleDep = new ShuffleDependency (shuffleMapRdd, null )
563
564
val shuffleId = shuffleDep.shuffleId
@@ -572,13 +573,15 @@ class DAGSchedulerSuite
572
573
// The map stage should have been submitted.
573
574
assert(countSubmittedMapStageAttempts() === 1 )
574
575
576
+ println(" late fetch failure: taskSets = " + taskSets)
575
577
complete(taskSets(0 ), Seq (
576
578
(Success , makeMapStatus(" hostA" , 1 )),
577
579
(Success , makeMapStatus(" hostB" , 1 ))))
578
580
// The MapOutputTracker should know about both map output locations.
579
581
assert(mapOutputTracker.getServerStatuses(shuffleId, 0 ).map(_._1.host) ===
580
582
Array (" hostA" , " hostB" ))
581
583
584
+ println(" late fetch failure: taskSets = " + taskSets)
582
585
// The first result task fails, with a fetch failure for the output from the first mapper.
583
586
runEvent(CompletionEvent (
584
587
taskSets(1 ).tasks(0 ),
@@ -622,6 +625,7 @@ class DAGSchedulerSuite
622
625
*/
623
626
test(" extremely late fetch failures don't cause multiple concurrent attempts for " +
624
627
" the same stage" ) {
628
+ println(" begin extremely late fetch failure" )
625
629
val shuffleMapRdd = new MyRDD (sc, 2 , Nil )
626
630
val shuffleDep = new ShuffleDependency (shuffleMapRdd, null )
627
631
val shuffleId = shuffleDep.shuffleId
@@ -639,6 +643,7 @@ class DAGSchedulerSuite
639
643
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
640
644
assert(countSubmittedMapStageAttempts() === 1 )
641
645
646
+ println(" extremely late fetch failure: taskSets = " + taskSets)
642
647
// Complete the map stage.
643
648
complete(taskSets(0 ), Seq (
644
649
(Success , makeMapStatus(" hostA" , 1 )),
@@ -648,6 +653,7 @@ class DAGSchedulerSuite
648
653
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
649
654
assert(countSubmittedReduceStageAttempts() === 1 )
650
655
656
+ println(" extremely late fetch failure: taskSets = " + taskSets)
651
657
// The first result task fails, with a fetch failure for the output from the first mapper.
652
658
runEvent(CompletionEvent (
653
659
taskSets(1 ).tasks(0 ),
0 commit comments