Skip to content

Commit 446948a

Browse files
smurakoziMarcelo Vanzin
authored andcommitted
[SPARK-23121][CORE] Fix for ui becoming unaccessible for long running streaming apps
## What changes were proposed in this pull request? The allJobs and the job pages attempt to use stage attempt and DAG visualization from the store, but for long running jobs they are not guaranteed to be retained, leading to exceptions when these pages are rendered. To fix it `store.lastStageAttempt(stageId)` and `store.operationGraphForJob(jobId)` are wrapped in `store.asOption` and default values are used if the info is missing. ## How was this patch tested? Manual testing of the UI, also using the test command reported in SPARK-23121: ./bin/spark-submit --class org.apache.spark.examples.streaming.HdfsWordCount ./examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar /spark Closes #20287 Author: Sandor Murakozi <smurakozi@gmail.com> Closes #20330 from smurakozi/SPARK-23121.
1 parent 4327ccf commit 446948a

File tree

3 files changed

+27
-16
lines changed

3 files changed

+27
-16
lines changed

core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ import org.apache.spark.util.Utils
3636

3737
/** Page showing list of all ongoing and recently finished jobs */
3838
private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("") {
39+
40+
import ApiHelper._
41+
3942
private val JOBS_LEGEND =
4043
<div class="legend-area"><svg width="150px" height="85px">
4144
<rect class="succeeded-job-legend"
@@ -65,10 +68,9 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
6568
}.map { job =>
6669
val jobId = job.jobId
6770
val status = job.status
68-
val jobDescription = store.lastStageAttempt(job.stageIds.max).description
69-
val displayJobDescription = jobDescription
70-
.map(UIUtils.makeDescription(_, "", plainText = true).text)
71-
.getOrElse("")
71+
val (_, lastStageDescription) = lastStageNameAndDescription(store, job)
72+
val jobDescription = UIUtils.makeDescription(lastStageDescription, "", plainText = true).text
73+
7274
val submissionTime = job.submissionTime.get.getTime()
7375
val completionTime = job.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis())
7476
val classNameByStatus = status match {
@@ -80,7 +82,7 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
8082

8183
// The timeline library treats contents as HTML, so we have to escape them. We need to add
8284
// extra layers of escaping in order to embed this in a Javascript string literal.
83-
val escapedDesc = Utility.escape(displayJobDescription)
85+
val escapedDesc = Utility.escape(jobDescription)
8486
val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc)
8587
val jobEventJsonAsStr =
8688
s"""
@@ -430,6 +432,8 @@ private[ui] class JobDataSource(
430432
sortColumn: String,
431433
desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
432434

435+
import ApiHelper._
436+
433437
// Convert JobUIData to JobTableRowData which contains the final contents to show in the table
434438
// so that we can avoid creating duplicate contents during sorting the data
435439
private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))
@@ -454,23 +458,21 @@ private[ui] class JobDataSource(
454458
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
455459
val submissionTime = jobData.submissionTime
456460
val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
457-
val lastStageAttempt = store.lastStageAttempt(jobData.stageIds.max)
458-
val lastStageDescription = lastStageAttempt.description.getOrElse("")
461+
val (lastStageName, lastStageDescription) = lastStageNameAndDescription(store, jobData)
459462

460-
val formattedJobDescription =
461-
UIUtils.makeDescription(lastStageDescription, basePath, plainText = false)
463+
val jobDescription = UIUtils.makeDescription(lastStageDescription, basePath, plainText = false)
462464

463465
val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)
464466

465467
new JobTableRowData(
466468
jobData,
467-
lastStageAttempt.name,
469+
lastStageName,
468470
lastStageDescription,
469471
duration.getOrElse(-1),
470472
formattedDuration,
471473
submissionTime.map(_.getTime()).getOrElse(-1L),
472474
formattedSubmissionTime,
473-
formattedJobDescription,
475+
jobDescription,
474476
detailUrl
475477
)
476478
}

core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,14 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
336336
content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
337337
store.executorList(false), appStartTime)
338338

339-
content ++= UIUtils.showDagVizForJob(
340-
jobId, store.operationGraphForJob(jobId))
339+
val operationGraphContent = store.asOption(store.operationGraphForJob(jobId)) match {
340+
case Some(operationGraph) => UIUtils.showDagVizForJob(jobId, operationGraph)
341+
case None =>
342+
<div id="no-info">
343+
<p>No DAG visualization information to display for job {jobId}</p>
344+
</div>
345+
}
346+
content ++= operationGraphContent
341347

342348
if (shouldShowActiveStages) {
343349
content ++=

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@ import java.util.concurrent.TimeUnit
2323
import javax.servlet.http.HttpServletRequest
2424

2525
import scala.collection.mutable.{HashMap, HashSet}
26-
import scala.xml.{Elem, Node, Unparsed}
26+
import scala.xml.{Node, Unparsed}
2727

2828
import org.apache.commons.lang3.StringEscapeUtils
2929

30-
import org.apache.spark.SparkConf
31-
import org.apache.spark.internal.config._
3230
import org.apache.spark.scheduler.TaskLocality
3331
import org.apache.spark.status._
3432
import org.apache.spark.status.api.v1._
@@ -1020,4 +1018,9 @@ private object ApiHelper {
10201018
}
10211019
}
10221020

1021+
def lastStageNameAndDescription(store: AppStatusStore, job: JobData): (String, String) = {
1022+
val stage = store.asOption(store.lastStageAttempt(job.stageIds.max))
1023+
(stage.map(_.name).getOrElse(""), stage.flatMap(_.description).getOrElse(job.name))
1024+
}
1025+
10231026
}

0 commit comments

Comments
 (0)