@@ -1274,48 +1274,70 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
12741274 assert(allJobs.head.numFailedStages == 1 )
12751275 }
12761276
1277- test( " SPARK-25451: total tasks in the executor summary should match total stage tasks " ) {
1278- val testConf = conf.clone.set( LIVE_ENTITY_UPDATE_PERIOD , Long . MaxValue )
1277+ Seq ( true , false ).foreach { live =>
1278+ test( s " Total tasks in the executor summary should match total stage tasks (live = $live ) " ) {
12791279
1280- val listener = new AppStatusListener (store, testConf, true )
1280+ val testConf = if (live) {
1281+ conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD , Long .MaxValue )
1282+ } else {
1283+ conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD , - 1L )
1284+ }
12811285
1282- val stage = new StageInfo (1 , 0 , " stage" , 4 , Nil , Nil , " details" )
1283- listener.onJobStart(SparkListenerJobStart (1 , time, Seq (stage), null ))
1284- listener.onStageSubmitted(SparkListenerStageSubmitted (stage, new Properties ()))
1286+ val listener = new AppStatusListener (store, testConf, live)
12851287
1286- val tasks = createTasks(4 , Array (" 1" , " 2" ))
1287- tasks.foreach { task =>
1288- listener.onTaskStart(SparkListenerTaskStart (stage.stageId, stage.attemptNumber, task))
1289- }
1288+ Seq (" 1" , " 2" ).foreach { execId =>
1289+ listener.onExecutorAdded(SparkListenerExecutorAdded (0L , execId,
1290+ new ExecutorInfo (" host1" , 1 , Map .empty)))
1291+ }
1292+ val stage = new StageInfo (1 , 0 , " stage" , 4 , Nil , Nil , " details" )
1293+ listener.onJobStart(SparkListenerJobStart (1 , time, Seq (stage), null ))
1294+ listener.onStageSubmitted(SparkListenerStageSubmitted (stage, new Properties ()))
12901295
1291- time += 1
1292- tasks(0 ).markFinished(TaskState .FINISHED , time)
1293- listener.onTaskEnd(SparkListenerTaskEnd (stage.stageId, stage.attemptId, " taskType" ,
1294- Success , tasks(0 ), null ))
1295- time += 1
1296- tasks(1 ).markFinished(TaskState .FINISHED , time)
1297- listener.onTaskEnd(SparkListenerTaskEnd (stage.stageId, stage.attemptId, " taskType" ,
1298- Success , tasks(1 ), null ))
1296+ val tasks = createTasks(4 , Array (" 1" , " 2" ))
1297+ tasks.foreach { task =>
1298+ listener.onTaskStart(SparkListenerTaskStart (stage.stageId, stage.attemptNumber, task))
1299+ }
12991300
1300- stage.failureReason = Some (" Failed" )
1301- listener.onStageCompleted(SparkListenerStageCompleted (stage))
1302- time += 1
1303- listener.onJobEnd(SparkListenerJobEnd (1 , time, JobFailed (new RuntimeException (" Bad Executor" ))))
1301+ time += 1
1302+ tasks(0 ).markFinished(TaskState .FINISHED , time)
1303+ listener.onTaskEnd(SparkListenerTaskEnd (stage.stageId, stage.attemptNumber, " taskType" ,
1304+ Success , tasks(0 ), null ))
1305+ time += 1
1306+ tasks(1 ).markFinished(TaskState .FINISHED , time)
1307+ listener.onTaskEnd(SparkListenerTaskEnd (stage.stageId, stage.attemptNumber, " taskType" ,
1308+ Success , tasks(1 ), null ))
13041309
1305- time += 1
1306- tasks(2 ).markFinished(TaskState .FAILED , time)
1307- listener.onTaskEnd(SparkListenerTaskEnd (stage.stageId, stage.attemptId, " taskType" ,
1308- ExecutorLostFailure (" 1" , true , Some (" Lost executor" )), tasks(2 ), null ))
1309- time += 1
1310- tasks(3 ).markFinished(TaskState .FAILED , time)
1311- listener.onTaskEnd(SparkListenerTaskEnd (stage.stageId, stage.attemptId, " taskType" ,
1312- ExecutorLostFailure (" 2" , true , Some (" Lost executor" )), tasks(3 ), null ))
1313-
1314- val esummary = store.view(classOf [ExecutorStageSummaryWrapper ]).asScala.map(_.info)
1315- esummary.foreach { execSummary =>
1316- assert(execSummary.failedTasks === 1 )
1317- assert(execSummary.succeededTasks === 1 )
1318- assert(execSummary.killedTasks === 0 )
1310+ stage.failureReason = Some (" Failed" )
1311+ listener.onStageCompleted(SparkListenerStageCompleted (stage))
1312+ time += 1
1313+ listener.onJobEnd(SparkListenerJobEnd (1 , time, JobFailed (
1314+ new RuntimeException (" Bad Executor" ))))
1315+
1316+ time += 1
1317+ tasks(2 ).markFinished(TaskState .FAILED , time)
1318+ listener.onTaskEnd(SparkListenerTaskEnd (stage.stageId, stage.attemptNumber, " taskType" ,
1319+ ExecutorLostFailure (" 1" , true , Some (" Lost executor" )), tasks(2 ), null ))
1320+ time += 1
1321+ tasks(3 ).markFinished(TaskState .FAILED , time)
1322+ listener.onTaskEnd(SparkListenerTaskEnd (stage.stageId, stage.attemptNumber, " taskType" ,
1323+ ExecutorLostFailure (" 2" , true , Some (" Lost executor" )), tasks(3 ), null ))
1324+
1325+ val esummary = store.view(classOf [ExecutorStageSummaryWrapper ]).asScala.map(_.info)
1326+ esummary.foreach { execSummary =>
1327+ assert(execSummary.failedTasks === 1 )
1328+ assert(execSummary.succeededTasks === 1 )
1329+ assert(execSummary.killedTasks === 0 )
1330+ }
1331+
1332+ val allExecutorSummary = store.view(classOf [ExecutorSummaryWrapper ]).asScala.map(_.info)
1333+ assert(allExecutorSummary.size === 2 )
1334+ allExecutorSummary.foreach { allExecSummary =>
1335+ assert(allExecSummary.failedTasks === 1 )
1336+ assert(allExecSummary.activeTasks === 0 )
1337+ assert(allExecSummary.completedTasks === 1 )
1338+ }
1339+ store.delete(classOf [ExecutorSummaryWrapper ], " 1" )
1340+ store.delete(classOf [ExecutorSummaryWrapper ], " 2" )
13191341 }
13201342 }
13211343
0 commit comments