Skip to content

[SPARK-1726] [SPARK-2567] Eliminate zombie stages in UI. #1566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,6 @@ class DAGScheduler(
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
runningStages += stage
} else {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just to clarify what's going on here: prior to my change, we added a stage to runningStages here, after calling submitMissingTasks (so after the code I modified below gets executed). This could lead to a memory leak (if the stage needed to be aborted in submitMissingTasks, due to a NotSerializableException for example, because then it would never be removed from runningStages). It also meant that the DAGScheduler sent a SparkListenerStageSubmitted event to the UI, but never a SparkListenerStageCompleted (because, on line 1072, we only send a SparkListenerStageCompleted event if the stage is in runningStages).

for (parent <- missing) {
submitStage(parent)
Expand Down Expand Up @@ -753,11 +752,14 @@ class DAGScheduler(
null
}

// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))

if (tasks.size > 0) {
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also moved this event inside the check for tasks.size being > 0 -- because we shouldn't tell the UI/listeners about a stage if it doesn't have any tasks and therefore won't be run.


// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
// down the road, where we have several different implementations for local scheduler and
Expand Down
129 changes: 69 additions & 60 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,29 @@ class BuggyDAGEventProcessActor extends Actor {
}
}

/**
* An RDD for passing to DAGScheduler. These RDDs will use the dependencies and
* preferredLocations (if any) that are passed to them. They are deliberately not executable
* so we can test that DAGScheduler does not try to execute RDDs locally.
*/
class MyRDD(
sc: SparkContext,
numPartitions: Int,
dependencies: List[Dependency[_]],
locations: Seq[Seq[String]] = Nil) extends RDD[(Int, Int)](sc, dependencies) with Serializable {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
override def getPartitions = (0 until numPartitions).map(i => new Partition {
override def index = i
}).toArray
override def getPreferredLocations(split: Partition): Seq[String] =
if (locations.isDefinedAt(split.index))
locations(split.index)
else
Nil
override def toString: String = "DAGSchedulerSuiteRDD " + id
}

class DAGSchedulerSuiteDummyException extends Exception

class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
Expand Down Expand Up @@ -148,34 +171,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
* Type of RDD we use for testing. Note that we should never call the real RDD compute methods.
* This is a pair RDD type so it can always be used in ShuffleDependencies.
*/
type MyRDD = RDD[(Int, Int)]

/**
* Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and
* preferredLocations (if any) that are passed to them. They are deliberately not executable
* so we can test that DAGScheduler does not try to execute RDDs locally.
*/
private def makeRdd(
numPartitions: Int,
dependencies: List[Dependency[_]],
locations: Seq[Seq[String]] = Nil
): MyRDD = {
val maxPartition = numPartitions - 1
val newRDD = new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
override def getPartitions = (0 to maxPartition).map(i => new Partition {
override def index = i
}).toArray
override def getPreferredLocations(split: Partition): Seq[String] =
if (locations.isDefinedAt(split.index))
locations(split.index)
else
Nil
override def toString: String = "DAGSchedulerSuiteRDD " + id
}
newRDD
}
type PairOfIntsRDD = RDD[(Int, Int)]

/**
* Process the supplied event as if it were the top of the DAGScheduler event queue, expecting
Expand Down Expand Up @@ -234,19 +230,19 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
override def taskSucceeded(partition: Int, value: Any) = numResults += 1
override def jobFailed(exception: Exception) = throw exception
}
submit(makeRdd(0, Nil), Array(), listener = fakeListener)
submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener)
assert(numResults === 0)
}

test("run trivial job") {
submit(makeRdd(1, Nil), Array(0))
submit(new MyRDD(sc, 1, Nil), Array(0))
complete(taskSets(0), List((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}

test("local job") {
val rdd = new MyRDD(sc, Nil) {
val rdd = new PairOfIntsRDD(sc, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
Array(42 -> 0).iterator
override def getPartitions = Array( new Partition { override def index = 0 } )
Expand All @@ -260,7 +256,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

test("local job oom") {
val rdd = new MyRDD(sc, Nil) {
val rdd = new PairOfIntsRDD(sc, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new java.lang.OutOfMemoryError("test local job oom")
override def getPartitions = Array( new Partition { override def index = 0 } )
Expand All @@ -274,17 +270,17 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

test("run trivial job w/ dependency") {
val baseRdd = makeRdd(1, Nil)
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
val baseRdd = new MyRDD(sc, 1, Nil)
val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0))
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}

test("cache location preferences w/ dependency") {
val baseRdd = makeRdd(1, Nil)
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
val baseRdd = new MyRDD(sc, 1, Nil)
val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
cacheLocations(baseRdd.id -> 0) =
Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
submit(finalRdd, Array(0))
Expand All @@ -295,8 +291,22 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}

test("unserializable task") {
val unserializableRdd = new MyRDD(sc, 1, Nil) {
class UnserializableClass
val unserializable = new UnserializableClass
}
submit(unserializableRdd, Array(0))
assert(failure.getMessage.startsWith(
"Job aborted due to stage failure: Task not serializable:"))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty
}

test("trivial job failure") {
submit(makeRdd(1, Nil), Array(0))
submit(new MyRDD(sc, 1, Nil), Array(0))
failed(taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted due to stage failure: some failure")
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
Expand All @@ -306,7 +316,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

test("trivial job cancellation") {
val rdd = makeRdd(1, Nil)
val rdd = new MyRDD(sc, 1, Nil)
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled ")
Expand Down Expand Up @@ -347,8 +357,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
val rdd = makeRdd(1, Nil)
val jobId = submit(rdd, Array(0))
val jobId = submit(new MyRDD(sc, 1, Nil), Array(0))
cancel(jobId)
// Because the job wasn't actually cancelled, we shouldn't have received a failure message.
assert(failure === null)
Expand All @@ -364,10 +373,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

test("run trivial shuffle") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = makeRdd(1, List(shuffleDep))
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
Expand All @@ -380,10 +389,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

test("run trivial shuffle with fetch failure") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = makeRdd(2, List(shuffleDep))
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
Expand All @@ -406,10 +415,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

test("ignore late map task completions") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = makeRdd(2, List(shuffleDep))
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
// pretend we were told hostA went away
val oldEpoch = mapOutputTracker.getEpoch
Expand All @@ -435,9 +444,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

test("run shuffle with map stage failure") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val reduceRdd = makeRdd(2, List(shuffleDep))
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))

// Fail the map stage. This should cause the entire job to fail.
Expand Down Expand Up @@ -472,13 +481,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
* without shuffleMapRdd1.
*/
test("failure of stage used by two jobs") {
val shuffleMapRdd1 = makeRdd(2, Nil)
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil)
val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, null)
val shuffleMapRdd2 = makeRdd(2, Nil)
val shuffleMapRdd2 = new MyRDD(sc, 2, Nil)
val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, null)

val reduceRdd1 = makeRdd(2, List(shuffleDep1))
val reduceRdd2 = makeRdd(2, List(shuffleDep1, shuffleDep2))
val reduceRdd1 = new MyRDD(sc, 2, List(shuffleDep1))
val reduceRdd2 = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep2))

// We need to make our own listeners for this test, since by default submit uses the same
// listener for all jobs, and here we want to capture the failure for each job separately.
Expand Down Expand Up @@ -511,10 +520,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

test("run trivial shuffle with out-of-band failure and retry") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = makeRdd(1, List(shuffleDep))
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
// blockManagerMaster.removeExecutor("exec-hostA")
// pretend we were told hostA went away
Expand All @@ -534,11 +543,11 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

test("recursive shuffle failures") {
val shuffleOneRdd = makeRdd(2, Nil)
val shuffleOneRdd = new MyRDD(sc, 2, Nil)
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne))
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
val finalRdd = makeRdd(1, List(shuffleDepTwo))
val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
submit(finalRdd, Array(0))
// have the first stage complete normally
complete(taskSets(0), Seq(
Expand All @@ -563,11 +572,11 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

test("cached post-shuffle") {
val shuffleOneRdd = makeRdd(2, Nil)
val shuffleOneRdd = new MyRDD(sc, 2, Nil)
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne))
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
val finalRdd = makeRdd(1, List(shuffleDepTwo))
val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
submit(finalRdd, Array(0))
cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
Expand Down