Skip to content

[SPARK-3984] [SPARK-3983] Fix incorrect scheduler delay and display task deserialization time in UI #2832

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private[spark] class Executor(
}

override def run() {
val startTime = System.currentTimeMillis()
val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
Expand Down Expand Up @@ -197,7 +197,7 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()

for (m <- task.metrics) {
m.executorDeserializeTime = taskStart - startTime
m.executorDeserializeTime = taskStart - deserializeStartTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ private[spark] object ToolTips {
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
of task results."""

val TASK_DESERIALIZATION_TIME =
"""Time spent deserializating the task closure on the executor."""

val INPUT = "Bytes read from Hadoop or from Spark storage."

val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
Expand Down
31 changes: 30 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Scheduler Delay</span>
</span>
</li>
<li>
<span data-toggle="tooltip"
title={ToolTips.TASK_DESERIALIZATION_TIME} data-placement="right">
<input type="checkbox" name={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}/>
<span class="additional-metric-title">Task Deserialization Time</span>
</span>
</li>
<li>
<span data-toggle="tooltip"
title={ToolTips.GC_TIME} data-placement="right">
Expand Down Expand Up @@ -147,6 +154,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
("GC Time", TaskDetailsClassNames.GC_TIME),
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
Expand Down Expand Up @@ -179,6 +187,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
}
}

val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorDeserializeTime.toDouble
}
val deserializationQuantiles =
<td>
<span data-toggle="tooltip" title={ToolTips.TASK_DESERIALIZATION_TIME}
data-placement="right">
Task Deserialization Time
</span>
</td> +: getFormattedTimeQuantiles(deserializationTimes)

val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
}
Expand Down Expand Up @@ -266,6 +285,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val listings: Seq[Seq[Node]] = Seq(
<tr>{serviceQuantiles}</tr>,
<tr class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>,
<tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
{deserializationQuantiles}
</tr>
<tr class={TaskDetailsClassNames.GC_TIME}>{gcQuantiles}</tr>,
<tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
{serializationQuantiles}
Expand Down Expand Up @@ -314,6 +336,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
val schedulerDelay = getSchedulerDelay(info, metrics.get)
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
val gettingResultTime = info.gettingResultTime

Expand Down Expand Up @@ -367,6 +390,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
class={TaskDetailsClassNames.SCHEDULER_DELAY}>
{UIUtils.formatDuration(schedulerDelay.toLong)}
</td>
<td sorttable_customkey={taskDeserializationTime.toString}
class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
{UIUtils.formatDuration(taskDeserializationTime.toLong)}
</td>
<td sorttable_customkey={gcTime.toString} class={TaskDetailsClassNames.GC_TIME}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
Expand Down Expand Up @@ -424,6 +451,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
(info.finishTime - info.launchTime)
}
}
totalExecutionTime - metrics.executorRunTime
val executorOverhead = (metrics.executorDeserializeTime +
metrics.resultSerializationTime)
totalExecutionTime - metrics.executorRunTime - executorOverhead
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package org.apache.spark.ui.jobs
private object TaskDetailsClassNames {
val SCHEDULER_DELAY = "scheduler_delay"
val GC_TIME = "gc_time"
val TASK_DESERIALIZATION_TIME = "deserialization_time"
val RESULT_SERIALIZATION_TIME = "serialization_time"
val GETTING_RESULT_TIME = "getting_result_time"
}