Skip to content

Commit 28d70aa

Browse files
committed
wip on getting a better test case ...
1 parent a9bf31f commit 28d70aa

File tree

1 file changed

+105
-0
lines changed

1 file changed

+105
-0
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler
18+
19+
import java.util.Date
20+
21+
import scala.collection.mutable.{ArrayBuffer, HashMap}
22+
23+
import org.apache.spark.shuffle.FetchFailedException
24+
import org.apache.spark.storage.BlockManagerId
25+
import org.apache.spark._
26+
27+
class DAGSchedulerFailureRecoverySuite extends SparkFunSuite with Logging {
28+
29+
// TODO we should run this with a matrix of configurations: different shufflers,
30+
// external shuffle service, etc. But that is really pushing the question of how to run
31+
// such a long test ...
32+
33+
ignore("no concurrent retries for stage attempts (SPARK-7308)") {
34+
// see SPARK-7308 for a detailed description of the conditions this is trying to recreate.
35+
// note that this is somewhat convoluted for a test case, but isn't actually very unusual
36+
// under a real workload. We only fail the first attempt of stage 2, but that
37+
// could be enough to cause havoc.
38+
39+
(0 until 100).foreach { idx =>
40+
println(new Date() + "\ttrial " + idx)
41+
logInfo(new Date() + "\ttrial " + idx)
42+
43+
val conf = new SparkConf().set("spark.executor.memory", "100m")
44+
val clusterSc = new SparkContext("local-cluster[5,4,100]", "test-cluster", conf)
45+
val bms = ArrayBuffer[BlockManagerId]()
46+
val stageFailureCount = HashMap[Int, Int]()
47+
clusterSc.addSparkListener(new SparkListener {
48+
override def onBlockManagerAdded(bmAdded: SparkListenerBlockManagerAdded): Unit = {
49+
bms += bmAdded.blockManagerId
50+
}
51+
52+
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
53+
if (stageCompleted.stageInfo.failureReason.isDefined) {
54+
val stage = stageCompleted.stageInfo.stageId
55+
stageFailureCount(stage) = stageFailureCount.getOrElse(stage, 0) + 1
56+
val reason = stageCompleted.stageInfo.failureReason.get
57+
println("stage " + stage + " failed: " + stageFailureCount(stage))
58+
}
59+
}
60+
})
61+
try {
62+
val rawData = clusterSc.parallelize(1 to 1e6.toInt, 20).map { x => (x % 100) -> x }.cache()
63+
rawData.count()
64+
65+
// choose any executor block manager for the fetch failures. Just can't be driver
66+
// to avoid broadcast failures
67+
val someBlockManager = bms.filter{!_.isDriver}(0)
68+
69+
val shuffled = rawData.groupByKey(100).mapPartitionsWithIndex { case (idx, itr) =>
70+
// we want one failure quickly, and more failures after stage 0 has finished its
71+
// second attempt
72+
val stageAttemptId = TaskContext.get().asInstanceOf[TaskContextImpl].stageAttemptId
73+
if (stageAttemptId == 0) {
74+
if (idx == 0) {
75+
throw new FetchFailedException(someBlockManager, 0, 0, idx,
76+
cause = new RuntimeException("simulated fetch failure"))
77+
} else if (idx > 0 && math.random < 0.2) {
78+
Thread.sleep(5000)
79+
throw new FetchFailedException(someBlockManager, 0, 0, idx,
80+
cause = new RuntimeException("simulated fetch failure"))
81+
} else {
82+
// want to make sure plenty of these finish after task 0 fails, and some even finish
83+
// after the previous stage is retried and this stage retry is started
84+
Thread.sleep((500 + math.random * 5000).toLong)
85+
}
86+
}
87+
itr.map { x => ((x._1 + 5) % 100) -> x._2 }
88+
}
89+
val data = shuffled.mapPartitions { itr => itr.flatMap(_._2) }.collect()
90+
val count = data.size
91+
assert(count === 1e6.toInt)
92+
assert(data.toSet === (1 to 1e6.toInt).toSet)
93+
94+
assert(stageFailureCount.getOrElse(1, 0) === 0)
95+
assert(stageFailureCount.getOrElse(2, 0) == 1)
96+
assert(stageFailureCount.getOrElse(3, 0) == 0)
97+
} finally {
98+
clusterSc.stop()
99+
}
100+
}
101+
}
102+
103+
104+
105+
}

0 commit comments

Comments
 (0)