Skip to content

Commit 3f01847

Browse files
committed
Merge pull request #2 from kayousterhout/SPARK-3224
Added unit test for SPARK-3224
2 parents 3d3d356 + 796d282 commit 3f01847

File tree

1 file changed

+39
-2
lines changed

1 file changed

+39
-2
lines changed

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import scala.collection.mutable.{HashSet, HashMap, Map}
20+
import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
2121
import scala.language.reflectiveCalls
2222

2323
import akka.actor._
@@ -98,7 +98,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
9898
val WAIT_TIMEOUT_MILLIS = 10000
9999
val sparkListener = new SparkListener() {
100100
val successfulStages = new HashSet[Int]()
101-
val failedStages = new HashSet[Int]()
101+
val failedStages = new ArrayBuffer[Int]()
102102
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
103103
val stageInfo = stageCompleted.stageInfo
104104
if (stageInfo.failureReason.isEmpty) {
@@ -435,6 +435,43 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
435435
assertDataStructuresEmpty
436436
}
437437

438+
test("trivial shuffle with multiple fetch failures") {
439+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
440+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
441+
val shuffleId = shuffleDep.shuffleId
442+
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
443+
submit(reduceRdd, Array(0, 1))
444+
complete(taskSets(0), Seq(
445+
(Success, makeMapStatus("hostA", 1)),
446+
(Success, makeMapStatus("hostB", 1))))
447+
// The MapOutputTracker should know about both map output locations.
448+
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
449+
Array("hostA", "hostB"))
450+
451+
// The first result task fails, with a fetch failure for the output from the first mapper.
452+
runEvent(CompletionEvent(
453+
taskSets(1).tasks(0),
454+
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0),
455+
null,
456+
Map[Long, Any](),
457+
null,
458+
null))
459+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
460+
assert(sparkListener.failedStages.contains(0))
461+
462+
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
463+
runEvent(CompletionEvent(
464+
taskSets(1).tasks(0),
465+
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1),
466+
null,
467+
Map[Long, Any](),
468+
null,
469+
null))
470+
// The SparkListener should not receive redundant failure events.
471+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
472+
assert(sparkListener.failedStages.size == 1)
473+
}
474+
438475
test("ignore late map task completions") {
439476
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
440477
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)

0 commit comments

Comments
 (0)