Skip to content

[SPARK-26260][Core]For disk store tasks summary table should show only successful tasks summary #26508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 23 additions & 59 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,6 @@ private[spark] class AppStatusStore(
store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality
}

// SPARK-26119: we only want to consider successful tasks when calculating the metrics summary,
// but currently this is very expensive when using a disk store. So we only trigger the slower
// code path when we know we have all data in memory. The following method checks whether all
// the data will be in memory.
private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined

/**
* Calculates a summary of the task metrics for the given stage attempt, returning the
* requested quantiles for the recorded metrics.
Expand All @@ -162,21 +156,11 @@ private[spark] class AppStatusStore(
// cheaper for disk stores (avoids deserialization).
val count = {
Utils.tryWithResource(
if (isInMemoryStore) {
// For Live UI, we should count the tasks with status "SUCCESS" only.
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.STATUS)
.first("SUCCESS")
.last("SUCCESS")
.closeableIterator()
} else {
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.closeableIterator()
}
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.closeableIterator()
) { it =>
var _count = 0L
while (it.hasNext()) {
Expand Down Expand Up @@ -245,50 +229,30 @@ private[spark] class AppStatusStore(
// stabilize once the stage finishes. It's also slow, especially with disk stores.
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }

// TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119).
// For InMemory case, it is efficient to find using the following code. But for diskStore case
// we need an efficient solution to avoid deserialization time overhead. For that, we need to
// rework on the way indexing works, so that we can index by specific metrics for successful
// and failed tasks differently (would be tricky). Also would require changing the disk store
// version (to invalidate old stores).
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
if (isInMemoryStore) {
val quantileTasks = store.view(classOf[TaskDataWrapper])
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.asScala
.filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks
.toIndexedSeq

indices.map { index =>
fn(quantileTasks(index.toInt)).toDouble
}.toIndexedSeq
} else {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.closeableIterator()
) { it =>
var last = Double.NaN
var currentIdx = -1L
indices.map { idx =>
if (idx == currentIdx) {
.closeableIterator()
) { it =>
var last = Double.NaN
var currentIdx = -1L
indices.map { idx =>
if (idx == currentIdx) {
last
} else {
val diff = idx - currentIdx
currentIdx = idx
if (it.skip(diff - 1)) {
last = fn(it.next()).toDouble
last
} else {
val diff = idx - currentIdx
currentIdx = idx
if (it.skip(diff - 1)) {
last = fn(it.next()).toDouble
last
} else {
Double.NaN
}
Double.NaN
}
}.toIndexedSeq
}
}
}.toIndexedSeq
}
}

Expand Down Expand Up @@ -582,7 +546,7 @@ private[spark] class AppStatusStore(

private[spark] object AppStatusStore {

val CURRENT_VERSION = 1L
val CURRENT_VERSION = 2L

/**
* Create an in-memory store for a live application.
Expand Down
102 changes: 78 additions & 24 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,19 @@ private class LiveTask(
info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis()))
}

val hasMetrics = metrics.executorDeserializeTime >= 0

/**
* SPARK-26260: For non successful tasks, store the metrics as negative to avoid
* the calculation in the task summary. `toApi` method in the `TaskDataWrapper` will make
* it actual value.
*/
val taskMetrics: v1.TaskMetrics = if (hasMetrics && !info.successful) {
makeNegative(metrics)
} else {
metrics
}

new TaskDataWrapper(
info.taskId,
info.index,
Expand All @@ -199,30 +212,31 @@ private class LiveTask(
newAccumulatorInfos(info.accumulables),
errorMessage,

metrics.executorDeserializeTime,
metrics.executorDeserializeCpuTime,
metrics.executorRunTime,
metrics.executorCpuTime,
metrics.resultSize,
metrics.jvmGcTime,
metrics.resultSerializationTime,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
metrics.peakExecutionMemory,
metrics.inputMetrics.bytesRead,
metrics.inputMetrics.recordsRead,
metrics.outputMetrics.bytesWritten,
metrics.outputMetrics.recordsWritten,
metrics.shuffleReadMetrics.remoteBlocksFetched,
metrics.shuffleReadMetrics.localBlocksFetched,
metrics.shuffleReadMetrics.fetchWaitTime,
metrics.shuffleReadMetrics.remoteBytesRead,
metrics.shuffleReadMetrics.remoteBytesReadToDisk,
metrics.shuffleReadMetrics.localBytesRead,
metrics.shuffleReadMetrics.recordsRead,
metrics.shuffleWriteMetrics.bytesWritten,
metrics.shuffleWriteMetrics.writeTime,
metrics.shuffleWriteMetrics.recordsWritten,
hasMetrics,
taskMetrics.executorDeserializeTime,
taskMetrics.executorDeserializeCpuTime,
taskMetrics.executorRunTime,
taskMetrics.executorCpuTime,
taskMetrics.resultSize,
taskMetrics.jvmGcTime,
taskMetrics.resultSerializationTime,
taskMetrics.memoryBytesSpilled,
taskMetrics.diskBytesSpilled,
taskMetrics.peakExecutionMemory,
taskMetrics.inputMetrics.bytesRead,
taskMetrics.inputMetrics.recordsRead,
taskMetrics.outputMetrics.bytesWritten,
taskMetrics.outputMetrics.recordsWritten,
taskMetrics.shuffleReadMetrics.remoteBlocksFetched,
taskMetrics.shuffleReadMetrics.localBlocksFetched,
taskMetrics.shuffleReadMetrics.fetchWaitTime,
taskMetrics.shuffleReadMetrics.remoteBytesRead,
taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk,
taskMetrics.shuffleReadMetrics.localBytesRead,
taskMetrics.shuffleReadMetrics.recordsRead,
taskMetrics.shuffleWriteMetrics.bytesWritten,
taskMetrics.shuffleWriteMetrics.writeTime,
taskMetrics.shuffleWriteMetrics.recordsWritten,

stageId,
stageAttemptId)
Expand Down Expand Up @@ -710,6 +724,46 @@ private object LiveEntityHelpers {
addMetrics(m1, m2, -1)
}

/**
* Convert all the metric values to negative as well as handle zero values.
* This method assumes that all the metric values are greater than or equal to zero
*/
def makeNegative(m: v1.TaskMetrics): v1.TaskMetrics = {
// To handle 0 metric value, add 1 and make the metric negative.
// To recover actual value do `math.abs(metric + 1)`
// Eg: if the metric values are (5, 3, 0, 1) => Updated metric values will be (-6, -4, -1, -2)
// To get actual metric value, do math.abs(metric + 1) => (5, 3, 0, 1)
def updateMetricValue(metric: Long): Long = {
metric * -1L - 1L
}

createMetrics(
updateMetricValue(m.executorDeserializeTime),
updateMetricValue(m.executorDeserializeCpuTime),
updateMetricValue(m.executorRunTime),
updateMetricValue(m.executorCpuTime),
updateMetricValue(m.resultSize),
updateMetricValue(m.jvmGcTime),
updateMetricValue(m.resultSerializationTime),
updateMetricValue(m.memoryBytesSpilled),
updateMetricValue(m.diskBytesSpilled),
updateMetricValue(m.peakExecutionMemory),
updateMetricValue(m.inputMetrics.bytesRead),
updateMetricValue(m.inputMetrics.recordsRead),
updateMetricValue(m.outputMetrics.bytesWritten),
updateMetricValue(m.outputMetrics.recordsWritten),
updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched),
updateMetricValue(m.shuffleReadMetrics.localBlocksFetched),
updateMetricValue(m.shuffleReadMetrics.fetchWaitTime),
updateMetricValue(m.shuffleReadMetrics.remoteBytesRead),
updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk),
updateMetricValue(m.shuffleReadMetrics.localBytesRead),
updateMetricValue(m.shuffleReadMetrics.recordsRead),
updateMetricValue(m.shuffleWriteMetrics.bytesWritten),
updateMetricValue(m.shuffleWriteMetrics.writeTime),
updateMetricValue(m.shuffleWriteMetrics.recordsWritten))
}

private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = {
createMetrics(
m1.executorDeserializeTime + m2.executorDeserializeTime * mult,
Expand Down
76 changes: 44 additions & 32 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,13 @@ private[spark] class TaskDataWrapper(
val accumulatorUpdates: Seq[AccumulableInfo],
val errorMessage: Option[String],

val hasMetrics: Boolean,
// The following is an exploded view of a TaskMetrics API object. This saves 5 objects
// (= 80 bytes of Java object overhead) per instance of this wrapper. If the first value
// (executorDeserializeTime) is -1L, it means the metrics for this task have not been
// recorded.
// (= 80 bytes of Java object overhead) per instance of this wrapper. Non successful
// tasks' metrics will have negative values in `TaskDataWrapper`. `TaskData` will have
// actual metric values. To recover the actual metric value from `TaskDataWrapper`,
// need use `getMetricValue` method. If `hasMetrics` is false, it means the metrics
// for this task have not been recorded.
@KVIndexParam(value = TaskIndexNames.DESER_TIME, parent = TaskIndexNames.STAGE)
val executorDeserializeTime: Long,
@KVIndexParam(value = TaskIndexNames.DESER_CPU_TIME, parent = TaskIndexNames.STAGE)
Expand Down Expand Up @@ -233,39 +236,46 @@ private[spark] class TaskDataWrapper(
val stageId: Int,
val stageAttemptId: Int) {

def hasMetrics: Boolean = executorDeserializeTime >= 0
// SPARK-26260: To handle non successful tasks metrics (Running, Failed, Killed).
private def getMetricValue(metric: Long): Long = {
if (status != "SUCCESS") {
math.abs(metric + 1)
} else {
metric
}
}

def toApi: TaskData = {
val metrics = if (hasMetrics) {
Some(new TaskMetrics(
executorDeserializeTime,
executorDeserializeCpuTime,
executorRunTime,
executorCpuTime,
resultSize,
jvmGcTime,
resultSerializationTime,
memoryBytesSpilled,
diskBytesSpilled,
peakExecutionMemory,
getMetricValue(executorDeserializeTime),
getMetricValue(executorDeserializeCpuTime),
getMetricValue(executorRunTime),
getMetricValue(executorCpuTime),
getMetricValue(resultSize),
getMetricValue(jvmGcTime),
getMetricValue(resultSerializationTime),
getMetricValue(memoryBytesSpilled),
getMetricValue(diskBytesSpilled),
getMetricValue(peakExecutionMemory),
new InputMetrics(
inputBytesRead,
inputRecordsRead),
getMetricValue(inputBytesRead),
getMetricValue(inputRecordsRead)),
new OutputMetrics(
outputBytesWritten,
outputRecordsWritten),
getMetricValue(outputBytesWritten),
getMetricValue(outputRecordsWritten)),
new ShuffleReadMetrics(
shuffleRemoteBlocksFetched,
shuffleLocalBlocksFetched,
shuffleFetchWaitTime,
shuffleRemoteBytesRead,
shuffleRemoteBytesReadToDisk,
shuffleLocalBytesRead,
shuffleRecordsRead),
getMetricValue(shuffleRemoteBlocksFetched),
getMetricValue(shuffleLocalBlocksFetched),
getMetricValue(shuffleFetchWaitTime),
getMetricValue(shuffleRemoteBytesRead),
getMetricValue(shuffleRemoteBytesReadToDisk),
getMetricValue(shuffleLocalBytesRead),
getMetricValue(shuffleRecordsRead)),
new ShuffleWriteMetrics(
shuffleBytesWritten,
shuffleWriteTime,
shuffleRecordsWritten)))
getMetricValue(shuffleBytesWritten),
getMetricValue(shuffleWriteTime),
getMetricValue(shuffleRecordsWritten))))
} else {
None
}
Expand Down Expand Up @@ -296,8 +306,10 @@ private[spark] class TaskDataWrapper(
@JsonIgnore @KVIndex(value = TaskIndexNames.SCHEDULER_DELAY, parent = TaskIndexNames.STAGE)
def schedulerDelay: Long = {
if (hasMetrics) {
AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration, executorDeserializeTime,
resultSerializationTime, executorRunTime)
AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration,
getMetricValue(executorDeserializeTime),
getMetricValue(resultSerializationTime),
getMetricValue(executorRunTime))
} else {
-1L
}
Expand Down Expand Up @@ -330,7 +342,7 @@ private[spark] class TaskDataWrapper(
@JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE)
private def shuffleTotalReads: Long = {
if (hasMetrics) {
shuffleLocalBytesRead + shuffleRemoteBytesRead
getMetricValue(shuffleLocalBytesRead) + getMetricValue(shuffleRemoteBytesRead)
} else {
-1L
}
Expand All @@ -339,7 +351,7 @@ private[spark] class TaskDataWrapper(
@JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = TaskIndexNames.STAGE)
private def shuffleTotalBlocks: Long = {
if (hasMetrics) {
shuffleLocalBlocksFetched + shuffleRemoteBlocksFetched
getMetricValue(shuffleLocalBlocksFetched) + getMetricValue(shuffleRemoteBlocksFetched)
} else {
-1L
}
Expand Down
Loading