Skip to content

SPARK-2298: Show stage attempt in UI #1384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,8 @@ class DAGScheduler(
null
}

stageToInfos(stage) = StageInfo.fromStage(stage)

// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.storage.RDDInfo
@DeveloperApi
class StageInfo(
val stageId: Int,
val attemptId: Int,
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
Expand Down Expand Up @@ -55,6 +56,6 @@ private[spark] object StageInfo {
def fromStage(stage: Stage): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details)
new StageInfo(stage.id, stage.attemptId, stage.name, stage.numTasks, rddInfos, stage.details)
}
}
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[ui] class StageTableBase(
protected def isFairScheduler = parent.isFairScheduler

protected def columns: Seq[Node] = {
<th>Stage Id</th> ++
<th>ID</th> ++
{if (isFairScheduler) {<th>Pool Name</th>} else Seq.empty} ++
<th>Description</th>
<th>Submitted</th>
Expand Down Expand Up @@ -141,7 +141,10 @@ private[ui] class StageTableBase(
val shuffleWrite = stageData.shuffleWriteBytes
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""

<td>{s.stageId}</td> ++
<td>{s.stageId}{if (s.attemptId > 0) {
" (Attempt %d)".format(s.attemptId + 1)
}}
</td> ++
{if (isFairScheduler) {
<td>
<a href={"%s/stages/pool?poolname=%s"
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ private[spark] object JsonProtocol {
val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
("Stage ID" -> stageInfo.stageId) ~
("Attempt ID" -> stageInfo.attemptId) ~
("Stage Name" -> stageInfo.name) ~
("Number of Tasks" -> stageInfo.numTasks) ~
("RDD Info" -> rddInfo) ~
Expand Down Expand Up @@ -478,6 +479,7 @@ private[spark] object JsonProtocol {

def stageInfoFromJson(json: JValue): StageInfo = {
val stageId = (json \ "Stage ID").extract[Int]
val attemptId = (json \ "Attempt ID").extractOpt[Int].getOrElse(0)
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
Expand All @@ -486,7 +488,7 @@ private[spark] object JsonProtocol {
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])

val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,27 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
conf.set("spark.ui.retainedStages", 5.toString)
val listener = new JobProgressListener(conf)

def createStageStartEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
def createStageStartEvent(stageId: Int, stageAttemptId: Int) = {
val stageInfo = new StageInfo(stageId, stageAttemptId, stageId.toString, 0, null, "")
SparkListenerStageSubmitted(stageInfo)
}

def createStageEndEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
def createStageEndEvent(stageId: Int, stageAttemptId: Int) = {
val stageInfo = new StageInfo(stageId, stageAttemptId, stageId.toString, 0, null, "")
SparkListenerStageCompleted(stageInfo)
}

for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
listener.onStageSubmitted(createStageStartEvent(i, 50-i))
listener.onStageCompleted(createStageEndEvent(i, 50-i))
}

listener.completedStages.size should be (5)
listener.completedStages.count(_.stageId == 50) should be (1)
listener.completedStages.count(_.stageId == 49) should be (1)
listener.completedStages.count(_.stageId == 48) should be (1)
listener.completedStages.count(_.stageId == 47) should be (1)
listener.completedStages.count(_.stageId == 46) should be (1)
listener.completedStages.filter(_.stageId == 50).filter(_.attemptId == 0).size should be (1)
listener.completedStages.filter(_.stageId == 49).filter(_.attemptId == 1).size should be (1)
listener.completedStages.filter(_.stageId == 48).filter(_.attemptId == 2).size should be (1)
listener.completedStages.filter(_.stageId == 47).filter(_.attemptId == 3).size should be (1)
listener.completedStages.filter(_.stageId == 46).filter(_.attemptId == 4).size should be (1)
}

test("test executor id to summary") {
Expand Down
28 changes: 20 additions & 8 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class JsonProtocolSuite extends FunSuite {

test("SparkListenerEvent") {
val stageSubmitted =
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L))
SparkListenerStageSubmitted(makeStageInfo(100, 150, 200, 300, 400L, 500L), properties)
val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 151, 201, 301, 401L, 501L))
val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false))
val taskGettingResult =
SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
Expand Down Expand Up @@ -78,7 +78,7 @@ class JsonProtocolSuite extends FunSuite {

test("Dependent Classes") {
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testStageInfo(makeStageInfo(10, 15, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
Expand Down Expand Up @@ -125,7 +125,7 @@ class JsonProtocolSuite extends FunSuite {

test("StageInfo.details backward compatibility") {
// StageInfo.details was added after 1.0.0.
val info = makeStageInfo(1, 2, 3, 4L, 5L)
val info = makeStageInfo(1, 11, 2, 3, 4L, 5L)
assert(info.details.nonEmpty)
val newJson = JsonProtocol.stageInfoToJson(info)
val oldJson = newJson.removeField { case (field, _) => field == "Details" }
Expand All @@ -134,6 +134,17 @@ class JsonProtocolSuite extends FunSuite {
assert("" === newInfo.details)
}

test("StageInfo.attemptId backward compatibility") {
// StageInfo.attemptId was added after 1.1.0.
val info = makeStageInfo(1, 11, 2, 3, 4L, 5L)
assert(info.details.nonEmpty)
val newJson = JsonProtocol.stageInfoToJson(info)
val oldJson = newJson.removeField { case (field, _) => field == "Attempt ID" }
val newInfo = JsonProtocol.stageInfoFromJson(oldJson)
assert(info.name === newInfo.name)
assert(0 === newInfo.attemptId)
}

test("InputMetrics backward compatibility") {
// InputMetrics were added after 1.0.1.
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true)
Expand Down Expand Up @@ -253,6 +264,7 @@ class JsonProtocolSuite extends FunSuite {

private def assertEquals(info1: StageInfo, info2: StageInfo) {
assert(info1.stageId === info2.stageId)
assert(info1.attemptId === info2.attemptId)
assert(info1.name === info2.name)
assert(info1.numTasks === info2.numTasks)
assert(info1.submissionTime === info2.submissionTime)
Expand Down Expand Up @@ -475,9 +487,9 @@ class JsonProtocolSuite extends FunSuite {
r
}

private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
private def makeStageInfo(a: Int, attemptId: Int, b: Int, c: Int, d: Long, e: Long) = {
val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
new StageInfo(a, "greetings", b, rddInfos, "details")
new StageInfo(a, attemptId, "greetings", b, rddInfos, "details")
}

private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
Expand Down Expand Up @@ -537,14 +549,14 @@ class JsonProtocolSuite extends FunSuite {

private val stageSubmittedJsonString =
"""
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"AttemptID":150,"Stage Name":
"greetings","Number of Tasks":200,"RDD Info":[],"Details":"details"},"Properties":
{"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
"""

private val stageCompletedJsonString =
"""
{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"Stage Name":
{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"AttemptID":151,"Stage Name":
"greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage
Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
"Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
Expand Down