Skip to content

Commit 57ffaf9

Browse files
shahidki31kai-chi
authored andcommitted
[SPARK-26219][CORE][BRANCH-2.4] Executor summary should get updated for failure jobs in the history server UI
Back port the commit apache#23181 into Spark2.4 branch Added UT Closes apache#23191 from shahidki31/branch-2.4. Authored-by: Shahid <shahidki31@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent eb45976 commit 57ffaf9

File tree

2 files changed

+66
-47
lines changed

2 files changed

+66
-47
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -599,9 +599,14 @@ private[spark] class AppStatusListener(
599599
}
600600
}
601601

602-
// Force an update on live applications when the number of active tasks reaches 0. This is
603-
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
604-
conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
602+
// Force an update on both live and history applications when the number of active tasks
603+
// reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be
604+
// reliably up to date.
605+
if (exec.activeTasks == 0) {
606+
update(exec, now)
607+
} else {
608+
maybeUpdate(exec, now)
609+
}
605610
}
606611
}
607612

@@ -954,14 +959,6 @@ private[spark] class AppStatusListener(
954959
}
955960
}
956961

957-
private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
958-
if (condition) {
959-
liveUpdate(entity, now)
960-
} else {
961-
maybeUpdate(entity, now)
962-
}
963-
}
964-
965962
private def cleanupExecutors(count: Long): Unit = {
966963
// Because the limit is on the number of *dead* executors, we need to calculate whether
967964
// there are actually enough dead executors to be deleted.

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 58 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,48 +1274,70 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
12741274
assert(allJobs.head.numFailedStages == 1)
12751275
}
12761276

1277-
test("SPARK-25451: total tasks in the executor summary should match total stage tasks") {
1278-
val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
1277+
Seq(true, false).foreach { live =>
1278+
test(s"Total tasks in the executor summary should match total stage tasks (live = $live)") {
12791279

1280-
val listener = new AppStatusListener(store, testConf, true)
1280+
val testConf = if (live) {
1281+
conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
1282+
} else {
1283+
conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, -1L)
1284+
}
12811285

1282-
val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
1283-
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
1284-
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
1286+
val listener = new AppStatusListener(store, testConf, live)
12851287

1286-
val tasks = createTasks(4, Array("1", "2"))
1287-
tasks.foreach { task =>
1288-
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
1289-
}
1288+
Seq("1", "2").foreach { execId =>
1289+
listener.onExecutorAdded(SparkListenerExecutorAdded(0L, execId,
1290+
new ExecutorInfo("host1", 1, Map.empty)))
1291+
}
1292+
val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
1293+
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
1294+
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
12901295

1291-
time += 1
1292-
tasks(0).markFinished(TaskState.FINISHED, time)
1293-
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
1294-
Success, tasks(0), null))
1295-
time += 1
1296-
tasks(1).markFinished(TaskState.FINISHED, time)
1297-
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
1298-
Success, tasks(1), null))
1296+
val tasks = createTasks(4, Array("1", "2"))
1297+
tasks.foreach { task =>
1298+
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
1299+
}
12991300

1300-
stage.failureReason = Some("Failed")
1301-
listener.onStageCompleted(SparkListenerStageCompleted(stage))
1302-
time += 1
1303-
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor"))))
1301+
time += 1
1302+
tasks(0).markFinished(TaskState.FINISHED, time)
1303+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1304+
Success, tasks(0), null))
1305+
time += 1
1306+
tasks(1).markFinished(TaskState.FINISHED, time)
1307+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1308+
Success, tasks(1), null))
13041309

1305-
time += 1
1306-
tasks(2).markFinished(TaskState.FAILED, time)
1307-
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
1308-
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
1309-
time += 1
1310-
tasks(3).markFinished(TaskState.FAILED, time)
1311-
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
1312-
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
1313-
1314-
val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
1315-
esummary.foreach { execSummary =>
1316-
assert(execSummary.failedTasks === 1)
1317-
assert(execSummary.succeededTasks === 1)
1318-
assert(execSummary.killedTasks === 0)
1310+
stage.failureReason = Some("Failed")
1311+
listener.onStageCompleted(SparkListenerStageCompleted(stage))
1312+
time += 1
1313+
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(
1314+
new RuntimeException("Bad Executor"))))
1315+
1316+
time += 1
1317+
tasks(2).markFinished(TaskState.FAILED, time)
1318+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1319+
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
1320+
time += 1
1321+
tasks(3).markFinished(TaskState.FAILED, time)
1322+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1323+
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
1324+
1325+
val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
1326+
esummary.foreach { execSummary =>
1327+
assert(execSummary.failedTasks === 1)
1328+
assert(execSummary.succeededTasks === 1)
1329+
assert(execSummary.killedTasks === 0)
1330+
}
1331+
1332+
val allExecutorSummary = store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info)
1333+
assert(allExecutorSummary.size === 2)
1334+
allExecutorSummary.foreach { allExecSummary =>
1335+
assert(allExecSummary.failedTasks === 1)
1336+
assert(allExecSummary.activeTasks === 0)
1337+
assert(allExecSummary.completedTasks === 1)
1338+
}
1339+
store.delete(classOf[ExecutorSummaryWrapper], "1")
1340+
store.delete(classOf[ExecutorSummaryWrapper], "2")
13191341
}
13201342
}
13211343

0 commit comments

Comments
 (0)