Skip to content

Commit f7a15d6

Browse files
committed
address comment
1 parent a5690f6 commit f7a15d6

File tree

4 files changed

+223
-236
lines changed

4 files changed

+223
-236
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ private[spark] class AppStatusStore(
158158
Utils.tryWithResource(
159159
store.view(classOf[TaskDataWrapper])
160160
.parent(stageKey)
161-
.index(SuccessTaskIndexNames.EXEC_RUN_TIME)
161+
.index(TaskIndexNames.EXEC_RUN_TIME)
162162
.first(0L)
163163
.closeableIterator()
164164
) { it =>
@@ -258,53 +258,53 @@ private[spark] class AppStatusStore(
258258

259259
val computedQuantiles = new v1.TaskMetricDistributions(
260260
quantiles = quantiles,
261-
executorDeserializeTime = scanTasks(SuccessTaskIndexNames.DESER_TIME) { t =>
261+
executorDeserializeTime = scanTasks(TaskIndexNames.DESER_TIME) { t =>
262262
t.executorDeserializeTime
263263
},
264-
executorDeserializeCpuTime = scanTasks(SuccessTaskIndexNames.DESER_CPU_TIME) { t =>
264+
executorDeserializeCpuTime = scanTasks(TaskIndexNames.DESER_CPU_TIME) { t =>
265265
t.executorDeserializeCpuTime
266266
},
267-
executorRunTime = scanTasks(SuccessTaskIndexNames.EXEC_RUN_TIME) { t => t.executorRunTime },
268-
executorCpuTime = scanTasks(SuccessTaskIndexNames.EXEC_CPU_TIME) { t => t.executorCpuTime },
269-
resultSize = scanTasks(SuccessTaskIndexNames.RESULT_SIZE) { t => t.resultSize },
270-
jvmGcTime = scanTasks(SuccessTaskIndexNames.GC_TIME) { t => t.jvmGcTime },
271-
resultSerializationTime = scanTasks(SuccessTaskIndexNames.SER_TIME) { t =>
267+
executorRunTime = scanTasks(TaskIndexNames.EXEC_RUN_TIME) { t => t.executorRunTime },
268+
executorCpuTime = scanTasks(TaskIndexNames.EXEC_CPU_TIME) { t => t.executorCpuTime },
269+
resultSize = scanTasks(TaskIndexNames.RESULT_SIZE) { t => t.resultSize },
270+
jvmGcTime = scanTasks(TaskIndexNames.GC_TIME) { t => t.jvmGcTime },
271+
resultSerializationTime = scanTasks(TaskIndexNames.SER_TIME) { t =>
272272
t.resultSerializationTime
273273
},
274-
gettingResultTime = scanTasks(SuccessTaskIndexNames.GETTING_RESULT_TIME) { t =>
274+
gettingResultTime = scanTasks(TaskIndexNames.GETTING_RESULT_TIME) { t =>
275275
t.gettingResultTime
276276
},
277-
schedulerDelay = scanTasks(SuccessTaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay },
278-
peakExecutionMemory = scanTasks(SuccessTaskIndexNames.PEAK_MEM) { t =>
277+
schedulerDelay = scanTasks(TaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay },
278+
peakExecutionMemory = scanTasks(TaskIndexNames.PEAK_MEM) { t =>
279279
t.peakExecutionMemory },
280-
memoryBytesSpilled = scanTasks(SuccessTaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled },
281-
diskBytesSpilled = scanTasks(SuccessTaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled },
280+
memoryBytesSpilled = scanTasks(TaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled },
281+
diskBytesSpilled = scanTasks(TaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled },
282282
inputMetrics = new v1.InputMetricDistributions(
283-
scanTasks(SuccessTaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead },
284-
scanTasks(SuccessTaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead }),
283+
scanTasks(TaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead },
284+
scanTasks(TaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead }),
285285
outputMetrics = new v1.OutputMetricDistributions(
286-
scanTasks(SuccessTaskIndexNames.OUTPUT_SIZE) { t => t.outputBytesWritten },
287-
scanTasks(SuccessTaskIndexNames.OUTPUT_RECORDS) { t => t.outputRecordsWritten }),
286+
scanTasks(TaskIndexNames.OUTPUT_SIZE) { t => t.outputBytesWritten },
287+
scanTasks(TaskIndexNames.OUTPUT_RECORDS) { t => t.outputRecordsWritten }),
288288
shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
289-
scanTasks(SuccessTaskIndexNames.SHUFFLE_TOTAL_READS) { m =>
289+
scanTasks(TaskIndexNames.SHUFFLE_TOTAL_READS) { m =>
290290
m.shuffleLocalBytesRead + m.shuffleRemoteBytesRead
291291
},
292-
scanTasks(SuccessTaskIndexNames.SHUFFLE_READ_RECORDS) { t => t.shuffleRecordsRead },
293-
scanTasks(SuccessTaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t =>
292+
scanTasks(TaskIndexNames.SHUFFLE_READ_RECORDS) { t => t.shuffleRecordsRead },
293+
scanTasks(TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t =>
294294
t.shuffleRemoteBlocksFetched },
295-
scanTasks(SuccessTaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => t.shuffleLocalBlocksFetched },
296-
scanTasks(SuccessTaskIndexNames.SHUFFLE_READ_TIME) { t => t.shuffleFetchWaitTime },
297-
scanTasks(SuccessTaskIndexNames.SHUFFLE_REMOTE_READS) { t => t.shuffleRemoteBytesRead },
298-
scanTasks(SuccessTaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK) { t =>
295+
scanTasks(TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => t.shuffleLocalBlocksFetched },
296+
scanTasks(TaskIndexNames.SHUFFLE_READ_TIME) { t => t.shuffleFetchWaitTime },
297+
scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS) { t => t.shuffleRemoteBytesRead },
298+
scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK) { t =>
299299
t.shuffleRemoteBytesReadToDisk
300300
},
301-
scanTasks(SuccessTaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m =>
301+
scanTasks(TaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m =>
302302
m.shuffleLocalBlocksFetched + m.shuffleRemoteBlocksFetched
303303
}),
304304
shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
305-
scanTasks(SuccessTaskIndexNames.SHUFFLE_WRITE_SIZE) { t => t.shuffleBytesWritten },
306-
scanTasks(SuccessTaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => t.shuffleRecordsWritten },
307-
scanTasks(SuccessTaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime }))
305+
scanTasks(TaskIndexNames.SHUFFLE_WRITE_SIZE) { t => t.shuffleBytesWritten },
306+
scanTasks(TaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => t.shuffleRecordsWritten },
307+
scanTasks(TaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime }))
308308

309309
// Go through the computed quantiles and cache the values that match the caching criteria.
310310
computedQuantiles.quantiles.zipWithIndex
@@ -548,7 +548,7 @@ private[spark] class AppStatusStore(
548548

549549
private[spark] object AppStatusStore {
550550

551-
val CURRENT_VERSION = 1L
551+
val CURRENT_VERSION = 2L
552552

553553
/**
554554
* Create an in-memory store for a live application.

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 85 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ import java.util.concurrent.atomic.AtomicInteger
2222

2323
import scala.collection.immutable.{HashSet, TreeSet}
2424
import scala.collection.mutable.HashMap
25+
import scala.collection.mutable
2526

2627
import com.google.common.collect.Interners
2728

2829
import org.apache.spark.JobExecutionStatus
2930
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
3031
import org.apache.spark.resource.ResourceInformation
3132
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
33+
import org.apache.spark.status.TaskIndexNames._
3234
import org.apache.spark.status.api.v1
3335
import org.apache.spark.storage.{RDDInfo, StorageLevel}
3436
import org.apache.spark.ui.SparkUI
@@ -184,6 +186,19 @@ private class LiveTask(
184186
info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis()))
185187
}
186188

189+
val hasMetrics = metrics.executorDeserializeTime >= 0
190+
val handleZeros = mutable.HashSet[String]()
191+
192+
/**
193+
* For non successful tasks, store the metrics as negetive to avoid the calculation in the
194+
* task summary. `toApi` method in TaskDataWrapper will make it actual value.
195+
*/
196+
val taskMetrics: v1.TaskMetrics = if (hasMetrics && !info.successful) {
197+
makeNegative(metrics, handleZeros)
198+
} else {
199+
metrics
200+
}
201+
187202
new TaskDataWrapper(
188203
info.taskId,
189204
info.index,
@@ -199,30 +214,32 @@ private class LiveTask(
199214
newAccumulatorInfos(info.accumulables),
200215
errorMessage,
201216

202-
metrics.executorDeserializeTime,
203-
metrics.executorDeserializeCpuTime,
204-
metrics.executorRunTime,
205-
metrics.executorCpuTime,
206-
metrics.resultSize,
207-
metrics.jvmGcTime,
208-
metrics.resultSerializationTime,
209-
metrics.memoryBytesSpilled,
210-
metrics.diskBytesSpilled,
211-
metrics.peakExecutionMemory,
212-
metrics.inputMetrics.bytesRead,
213-
metrics.inputMetrics.recordsRead,
214-
metrics.outputMetrics.bytesWritten,
215-
metrics.outputMetrics.recordsWritten,
216-
metrics.shuffleReadMetrics.remoteBlocksFetched,
217-
metrics.shuffleReadMetrics.localBlocksFetched,
218-
metrics.shuffleReadMetrics.fetchWaitTime,
219-
metrics.shuffleReadMetrics.remoteBytesRead,
220-
metrics.shuffleReadMetrics.remoteBytesReadToDisk,
221-
metrics.shuffleReadMetrics.localBytesRead,
222-
metrics.shuffleReadMetrics.recordsRead,
223-
metrics.shuffleWriteMetrics.bytesWritten,
224-
metrics.shuffleWriteMetrics.writeTime,
225-
metrics.shuffleWriteMetrics.recordsWritten,
217+
hasMetrics,
218+
handleZeros,
219+
taskMetrics.executorDeserializeTime,
220+
taskMetrics.executorDeserializeCpuTime,
221+
taskMetrics.executorRunTime,
222+
taskMetrics.executorCpuTime,
223+
taskMetrics.resultSize,
224+
taskMetrics.jvmGcTime,
225+
taskMetrics.resultSerializationTime,
226+
taskMetrics.memoryBytesSpilled,
227+
taskMetrics.diskBytesSpilled,
228+
taskMetrics.peakExecutionMemory,
229+
taskMetrics.inputMetrics.bytesRead,
230+
taskMetrics.inputMetrics.recordsRead,
231+
taskMetrics.outputMetrics.bytesWritten,
232+
taskMetrics.outputMetrics.recordsWritten,
233+
taskMetrics.shuffleReadMetrics.remoteBlocksFetched,
234+
taskMetrics.shuffleReadMetrics.localBlocksFetched,
235+
taskMetrics.shuffleReadMetrics.fetchWaitTime,
236+
taskMetrics.shuffleReadMetrics.remoteBytesRead,
237+
taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk,
238+
taskMetrics.shuffleReadMetrics.localBytesRead,
239+
taskMetrics.shuffleReadMetrics.recordsRead,
240+
taskMetrics.shuffleWriteMetrics.bytesWritten,
241+
taskMetrics.shuffleWriteMetrics.writeTime,
242+
taskMetrics.shuffleWriteMetrics.recordsWritten,
226243

227244
stageId,
228245
stageAttemptId)
@@ -710,6 +727,50 @@ private object LiveEntityHelpers {
710727
addMetrics(m1, m2, -1)
711728
}
712729

730+
/**
731+
* Convert all the metric values to negative as well as handle zero values.
732+
* This method assumes that all the metric values are greater than or equal to zero
733+
*/
734+
def makeNegative(
735+
m: v1.TaskMetrics,
736+
handleZeros: mutable.HashSet[String]): v1.TaskMetrics = {
737+
// If the metric value is 0, then make -1 and update the metric index in handleZeros.
738+
def updateMetricValue(metric: Long, index: String): Long = {
739+
if (metric == 0L) {
740+
handleZeros.add(index)
741+
-1L
742+
} else {
743+
metric * -1L
744+
}
745+
}
746+
747+
createMetrics(
748+
updateMetricValue(m.executorDeserializeTime, DESER_TIME),
749+
updateMetricValue(m.executorDeserializeCpuTime, DESER_CPU_TIME),
750+
updateMetricValue(m.executorRunTime, EXEC_RUN_TIME),
751+
updateMetricValue(m.executorCpuTime, EXEC_CPU_TIME),
752+
updateMetricValue(m.resultSize, RESULT_SIZE),
753+
updateMetricValue(m.jvmGcTime, GC_TIME),
754+
updateMetricValue(m.resultSerializationTime, SER_TIME),
755+
updateMetricValue(m.memoryBytesSpilled, MEM_SPILL),
756+
updateMetricValue(m.diskBytesSpilled, DISK_SPILL),
757+
updateMetricValue(m.peakExecutionMemory, PEAK_MEM),
758+
updateMetricValue(m.inputMetrics.bytesRead, INPUT_SIZE),
759+
updateMetricValue(m.inputMetrics.recordsRead, INPUT_RECORDS),
760+
updateMetricValue(m.outputMetrics.bytesWritten, OUTPUT_SIZE),
761+
updateMetricValue(m.outputMetrics.recordsWritten, OUTPUT_RECORDS),
762+
updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched, SHUFFLE_REMOTE_BLOCKS),
763+
updateMetricValue(m.shuffleReadMetrics.localBlocksFetched, SHUFFLE_LOCAL_BLOCKS),
764+
updateMetricValue(m.shuffleReadMetrics.fetchWaitTime, SHUFFLE_READ_TIME),
765+
updateMetricValue(m.shuffleReadMetrics.remoteBytesRead, SHUFFLE_REMOTE_READS),
766+
updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk, SHUFFLE_REMOTE_READS_TO_DISK),
767+
updateMetricValue(m.shuffleReadMetrics.localBytesRead, SHUFFLE_LOCAL_READ),
768+
updateMetricValue(m.shuffleReadMetrics.recordsRead, SHUFFLE_READ_RECORDS),
769+
updateMetricValue(m.shuffleWriteMetrics.bytesWritten, SHUFFLE_WRITE_SIZE),
770+
updateMetricValue(m.shuffleWriteMetrics.writeTime, SHUFFLE_WRITE_TIME),
771+
updateMetricValue(m.shuffleWriteMetrics.recordsWritten, SHUFFLE_WRITE_RECORDS))
772+
}
773+
713774
private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = {
714775
createMetrics(
715776
m1.executorDeserializeTime + m2.executorDeserializeTime * mult,

0 commit comments

Comments
 (0)