Skip to content

Commit 37ad3b7

Browse files
kayousterhoutmateiz
authored andcommitted
[SPARK-1726] [SPARK-2567] Eliminate zombie stages in UI.
Due to problems with when we update runningStages (in DAGScheduler.scala) and how we decide to send a SparkListenerStageCompleted message to SparkListeners, sometimes stages can be shown as "running" in the UI forever (even after they have failed). This issue can manifest when stages are resubmitted with 0 tasks, or when the DAGScheduler catches non-serializable tasks. The problem also resulted in a (small) memory leak in the DAGScheduler, where stages can stay in runningStages forever. This commit fixes that problem and adds a unit test. Thanks tsudukim for helping to look into this issue! cc markhamstra rxin Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #1566 from kayousterhout/dag_fix and squashes the following commits: 217d74b [Kay Ousterhout] [SPARK-1726] [SPARK-2567] Eliminate zombie stages in UI.
1 parent 47b6b38 commit 37ad3b7

File tree

2 files changed

+76
-65
lines changed

2 files changed

+76
-65
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,6 @@ class DAGScheduler(
710710
if (missing == Nil) {
711711
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
712712
submitMissingTasks(stage, jobId.get)
713-
runningStages += stage
714713
} else {
715714
for (parent <- missing) {
716715
submitStage(parent)
@@ -753,11 +752,14 @@ class DAGScheduler(
753752
null
754753
}
755754

756-
// must be run listener before possible NotSerializableException
757-
// should be "StageSubmitted" first and then "JobEnded"
758-
listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
759-
760755
if (tasks.size > 0) {
756+
runningStages += stage
757+
// SparkListenerStageSubmitted should be posted before testing whether tasks are
758+
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
759+
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
760+
// event.
761+
listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
762+
761763
// Preemptively serialize a task to make sure it can be serialized. We are catching this
762764
// exception here because it would be fairly hard to catch the non-serializable exception
763765
// down the road, where we have several different implementations for local scheduler and

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 69 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,29 @@ class BuggyDAGEventProcessActor extends Actor {
3737
}
3838
}
3939

40+
/**
41+
* An RDD for passing to DAGScheduler. These RDDs will use the dependencies and
42+
* preferredLocations (if any) that are passed to them. They are deliberately not executable
43+
* so we can test that DAGScheduler does not try to execute RDDs locally.
44+
*/
45+
class MyRDD(
46+
sc: SparkContext,
47+
numPartitions: Int,
48+
dependencies: List[Dependency[_]],
49+
locations: Seq[Seq[String]] = Nil) extends RDD[(Int, Int)](sc, dependencies) with Serializable {
50+
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
51+
throw new RuntimeException("should not be reached")
52+
override def getPartitions = (0 until numPartitions).map(i => new Partition {
53+
override def index = i
54+
}).toArray
55+
override def getPreferredLocations(split: Partition): Seq[String] =
56+
if (locations.isDefinedAt(split.index))
57+
locations(split.index)
58+
else
59+
Nil
60+
override def toString: String = "DAGSchedulerSuiteRDD " + id
61+
}
62+
4063
class DAGSchedulerSuiteDummyException extends Exception
4164

4265
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
@@ -148,34 +171,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
148171
* Type of RDD we use for testing. Note that we should never call the real RDD compute methods.
149172
* This is a pair RDD type so it can always be used in ShuffleDependencies.
150173
*/
151-
type MyRDD = RDD[(Int, Int)]
152-
153-
/**
154-
* Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and
155-
* preferredLocations (if any) that are passed to them. They are deliberately not executable
156-
* so we can test that DAGScheduler does not try to execute RDDs locally.
157-
*/
158-
private def makeRdd(
159-
numPartitions: Int,
160-
dependencies: List[Dependency[_]],
161-
locations: Seq[Seq[String]] = Nil
162-
): MyRDD = {
163-
val maxPartition = numPartitions - 1
164-
val newRDD = new MyRDD(sc, dependencies) {
165-
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
166-
throw new RuntimeException("should not be reached")
167-
override def getPartitions = (0 to maxPartition).map(i => new Partition {
168-
override def index = i
169-
}).toArray
170-
override def getPreferredLocations(split: Partition): Seq[String] =
171-
if (locations.isDefinedAt(split.index))
172-
locations(split.index)
173-
else
174-
Nil
175-
override def toString: String = "DAGSchedulerSuiteRDD " + id
176-
}
177-
newRDD
178-
}
174+
type PairOfIntsRDD = RDD[(Int, Int)]
179175

180176
/**
181177
* Process the supplied event as if it were the top of the DAGScheduler event queue, expecting
@@ -234,19 +230,19 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
234230
override def taskSucceeded(partition: Int, value: Any) = numResults += 1
235231
override def jobFailed(exception: Exception) = throw exception
236232
}
237-
submit(makeRdd(0, Nil), Array(), listener = fakeListener)
233+
submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener)
238234
assert(numResults === 0)
239235
}
240236

241237
test("run trivial job") {
242-
submit(makeRdd(1, Nil), Array(0))
238+
submit(new MyRDD(sc, 1, Nil), Array(0))
243239
complete(taskSets(0), List((Success, 42)))
244240
assert(results === Map(0 -> 42))
245241
assertDataStructuresEmpty
246242
}
247243

248244
test("local job") {
249-
val rdd = new MyRDD(sc, Nil) {
245+
val rdd = new PairOfIntsRDD(sc, Nil) {
250246
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
251247
Array(42 -> 0).iterator
252248
override def getPartitions = Array( new Partition { override def index = 0 } )
@@ -260,7 +256,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
260256
}
261257

262258
test("local job oom") {
263-
val rdd = new MyRDD(sc, Nil) {
259+
val rdd = new PairOfIntsRDD(sc, Nil) {
264260
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
265261
throw new java.lang.OutOfMemoryError("test local job oom")
266262
override def getPartitions = Array( new Partition { override def index = 0 } )
@@ -274,17 +270,17 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
274270
}
275271

276272
test("run trivial job w/ dependency") {
277-
val baseRdd = makeRdd(1, Nil)
278-
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
273+
val baseRdd = new MyRDD(sc, 1, Nil)
274+
val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
279275
submit(finalRdd, Array(0))
280276
complete(taskSets(0), Seq((Success, 42)))
281277
assert(results === Map(0 -> 42))
282278
assertDataStructuresEmpty
283279
}
284280

285281
test("cache location preferences w/ dependency") {
286-
val baseRdd = makeRdd(1, Nil)
287-
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
282+
val baseRdd = new MyRDD(sc, 1, Nil)
283+
val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
288284
cacheLocations(baseRdd.id -> 0) =
289285
Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
290286
submit(finalRdd, Array(0))
@@ -295,8 +291,22 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
295291
assertDataStructuresEmpty
296292
}
297293

294+
test("unserializable task") {
295+
val unserializableRdd = new MyRDD(sc, 1, Nil) {
296+
class UnserializableClass
297+
val unserializable = new UnserializableClass
298+
}
299+
submit(unserializableRdd, Array(0))
300+
assert(failure.getMessage.startsWith(
301+
"Job aborted due to stage failure: Task not serializable:"))
302+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
303+
assert(sparkListener.failedStages.contains(0))
304+
assert(sparkListener.failedStages.size === 1)
305+
assertDataStructuresEmpty
306+
}
307+
298308
test("trivial job failure") {
299-
submit(makeRdd(1, Nil), Array(0))
309+
submit(new MyRDD(sc, 1, Nil), Array(0))
300310
failed(taskSets(0), "some failure")
301311
assert(failure.getMessage === "Job aborted due to stage failure: some failure")
302312
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
@@ -306,7 +316,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
306316
}
307317

308318
test("trivial job cancellation") {
309-
val rdd = makeRdd(1, Nil)
319+
val rdd = new MyRDD(sc, 1, Nil)
310320
val jobId = submit(rdd, Array(0))
311321
cancel(jobId)
312322
assert(failure.getMessage === s"Job $jobId cancelled ")
@@ -347,8 +357,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
347357
}
348358
dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
349359
Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
350-
val rdd = makeRdd(1, Nil)
351-
val jobId = submit(rdd, Array(0))
360+
val jobId = submit(new MyRDD(sc, 1, Nil), Array(0))
352361
cancel(jobId)
353362
// Because the job wasn't actually cancelled, we shouldn't have received a failure message.
354363
assert(failure === null)
@@ -364,10 +373,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
364373
}
365374

366375
test("run trivial shuffle") {
367-
val shuffleMapRdd = makeRdd(2, Nil)
376+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
368377
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
369378
val shuffleId = shuffleDep.shuffleId
370-
val reduceRdd = makeRdd(1, List(shuffleDep))
379+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
371380
submit(reduceRdd, Array(0))
372381
complete(taskSets(0), Seq(
373382
(Success, makeMapStatus("hostA", 1)),
@@ -380,10 +389,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
380389
}
381390

382391
test("run trivial shuffle with fetch failure") {
383-
val shuffleMapRdd = makeRdd(2, Nil)
392+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
384393
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
385394
val shuffleId = shuffleDep.shuffleId
386-
val reduceRdd = makeRdd(2, List(shuffleDep))
395+
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
387396
submit(reduceRdd, Array(0, 1))
388397
complete(taskSets(0), Seq(
389398
(Success, makeMapStatus("hostA", 1)),
@@ -406,10 +415,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
406415
}
407416

408417
test("ignore late map task completions") {
409-
val shuffleMapRdd = makeRdd(2, Nil)
418+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
410419
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
411420
val shuffleId = shuffleDep.shuffleId
412-
val reduceRdd = makeRdd(2, List(shuffleDep))
421+
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
413422
submit(reduceRdd, Array(0, 1))
414423
// pretend we were told hostA went away
415424
val oldEpoch = mapOutputTracker.getEpoch
@@ -435,9 +444,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
435444
}
436445

437446
test("run shuffle with map stage failure") {
438-
val shuffleMapRdd = makeRdd(2, Nil)
447+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
439448
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
440-
val reduceRdd = makeRdd(2, List(shuffleDep))
449+
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
441450
submit(reduceRdd, Array(0, 1))
442451

443452
// Fail the map stage. This should cause the entire job to fail.
@@ -472,13 +481,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
472481
* without shuffleMapRdd1.
473482
*/
474483
test("failure of stage used by two jobs") {
475-
val shuffleMapRdd1 = makeRdd(2, Nil)
484+
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil)
476485
val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, null)
477-
val shuffleMapRdd2 = makeRdd(2, Nil)
486+
val shuffleMapRdd2 = new MyRDD(sc, 2, Nil)
478487
val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, null)
479488

480-
val reduceRdd1 = makeRdd(2, List(shuffleDep1))
481-
val reduceRdd2 = makeRdd(2, List(shuffleDep1, shuffleDep2))
489+
val reduceRdd1 = new MyRDD(sc, 2, List(shuffleDep1))
490+
val reduceRdd2 = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep2))
482491

483492
// We need to make our own listeners for this test, since by default submit uses the same
484493
// listener for all jobs, and here we want to capture the failure for each job separately.
@@ -511,10 +520,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
511520
}
512521

513522
test("run trivial shuffle with out-of-band failure and retry") {
514-
val shuffleMapRdd = makeRdd(2, Nil)
523+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
515524
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
516525
val shuffleId = shuffleDep.shuffleId
517-
val reduceRdd = makeRdd(1, List(shuffleDep))
526+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
518527
submit(reduceRdd, Array(0))
519528
// blockManagerMaster.removeExecutor("exec-hostA")
520529
// pretend we were told hostA went away
@@ -534,11 +543,11 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
534543
}
535544

536545
test("recursive shuffle failures") {
537-
val shuffleOneRdd = makeRdd(2, Nil)
546+
val shuffleOneRdd = new MyRDD(sc, 2, Nil)
538547
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
539-
val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
548+
val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne))
540549
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
541-
val finalRdd = makeRdd(1, List(shuffleDepTwo))
550+
val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
542551
submit(finalRdd, Array(0))
543552
// have the first stage complete normally
544553
complete(taskSets(0), Seq(
@@ -563,11 +572,11 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
563572
}
564573

565574
test("cached post-shuffle") {
566-
val shuffleOneRdd = makeRdd(2, Nil)
575+
val shuffleOneRdd = new MyRDD(sc, 2, Nil)
567576
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
568-
val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
577+
val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne))
569578
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
570-
val finalRdd = makeRdd(1, List(shuffleDepTwo))
579+
val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
571580
submit(finalRdd, Array(0))
572581
cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
573582
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))

0 commit comments

Comments
 (0)