@@ -182,8 +182,7 @@ private[spark] class TaskDataWrapper(
182
182
183
183
val hasMetrics : Boolean ,
184
184
// Non successful metrics will have negative values in `TaskDataWrapper`.
185
- // zero metric value will be converted to -1 and update the index in the hashset.
186
- // However `TaskData` will have actual metric values. To recover the actual metric value
185
+ // `TaskData` will have actual metric values. To recover the actual metric value
187
186
// from `TaskDataWrapper`, need use `getMetricValue` method. parameter `handleZero` is to
188
187
// check whether the index has zero metric value, which is used in the `getMetricValue`.
189
188
val handleZero : HashSet [String ],
@@ -244,7 +243,7 @@ private[spark] class TaskDataWrapper(
244
243
val stageAttemptId : Int ) {
245
244
246
245
// To handle non successful tasks metrics (Running, Failed, Killed).
247
- private def gerMetricValue (metric : Long , index : String ): Long = {
246
+ private def getMetricValue (metric : Long , index : String ): Long = {
248
247
if (handleZero(index)) {
249
248
0L
250
249
} else {
@@ -255,34 +254,35 @@ private[spark] class TaskDataWrapper(
255
254
def toApi : TaskData = {
256
255
val metrics = if (hasMetrics) {
257
256
Some (new TaskMetrics (
258
- gerMetricValue (executorDeserializeTime, TaskIndexNames .DESER_TIME ),
259
- gerMetricValue (executorDeserializeCpuTime, TaskIndexNames .DESER_CPU_TIME ),
260
- gerMetricValue (executorRunTime, TaskIndexNames .EXEC_RUN_TIME ),
261
- gerMetricValue (executorCpuTime, TaskIndexNames .EXEC_CPU_TIME ),
262
- gerMetricValue (resultSize, TaskIndexNames .RESULT_SIZE ),
263
- gerMetricValue (jvmGcTime, TaskIndexNames .GC_TIME ),
264
- gerMetricValue (resultSerializationTime, TaskIndexNames .SER_TIME ),
265
- gerMetricValue (memoryBytesSpilled, TaskIndexNames .MEM_SPILL ),
266
- gerMetricValue (diskBytesSpilled, TaskIndexNames .DISK_SPILL ),
267
- gerMetricValue (peakExecutionMemory, TaskIndexNames .PEAK_MEM ),
257
+ getMetricValue (executorDeserializeTime, TaskIndexNames .DESER_TIME ),
258
+ getMetricValue (executorDeserializeCpuTime, TaskIndexNames .DESER_CPU_TIME ),
259
+ getMetricValue (executorRunTime, TaskIndexNames .EXEC_RUN_TIME ),
260
+ getMetricValue (executorCpuTime, TaskIndexNames .EXEC_CPU_TIME ),
261
+ getMetricValue (resultSize, TaskIndexNames .RESULT_SIZE ),
262
+ getMetricValue (jvmGcTime, TaskIndexNames .GC_TIME ),
263
+ getMetricValue (resultSerializationTime, TaskIndexNames .SER_TIME ),
264
+ getMetricValue (memoryBytesSpilled, TaskIndexNames .MEM_SPILL ),
265
+ getMetricValue (diskBytesSpilled, TaskIndexNames .DISK_SPILL ),
266
+ getMetricValue (peakExecutionMemory, TaskIndexNames .PEAK_MEM ),
268
267
new InputMetrics (
269
- gerMetricValue (inputBytesRead, TaskIndexNames .INPUT_SIZE ),
270
- gerMetricValue (inputRecordsRead, TaskIndexNames .INPUT_RECORDS )),
268
+ getMetricValue (inputBytesRead, TaskIndexNames .INPUT_SIZE ),
269
+ getMetricValue (inputRecordsRead, TaskIndexNames .INPUT_RECORDS )),
271
270
new OutputMetrics (
272
- gerMetricValue (outputBytesWritten, TaskIndexNames .OUTPUT_SIZE ),
273
- gerMetricValue (outputRecordsWritten, TaskIndexNames .OUTPUT_RECORDS )),
271
+ getMetricValue (outputBytesWritten, TaskIndexNames .OUTPUT_SIZE ),
272
+ getMetricValue (outputRecordsWritten, TaskIndexNames .OUTPUT_RECORDS )),
274
273
new ShuffleReadMetrics (
275
- gerMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames .SHUFFLE_REMOTE_BLOCKS ),
276
- gerMetricValue(shuffleLocalBlocksFetched, TaskIndexNames .SHUFFLE_LOCAL_BLOCKS ),
277
- gerMetricValue(shuffleFetchWaitTime, TaskIndexNames .SHUFFLE_READ_TIME ),
278
- gerMetricValue(shuffleRemoteBytesRead, TaskIndexNames .SHUFFLE_REMOTE_READS ),
279
- gerMetricValue(shuffleRemoteBytesReadToDisk, TaskIndexNames .SHUFFLE_REMOTE_READS_TO_DISK ),
280
- gerMetricValue(shuffleLocalBytesRead, TaskIndexNames .SHUFFLE_LOCAL_READ ),
281
- gerMetricValue(shuffleRecordsRead, TaskIndexNames .SHUFFLE_READ_RECORDS )),
274
+ getMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames .SHUFFLE_REMOTE_BLOCKS ),
275
+ getMetricValue(shuffleLocalBlocksFetched, TaskIndexNames .SHUFFLE_LOCAL_BLOCKS ),
276
+ getMetricValue(shuffleFetchWaitTime, TaskIndexNames .SHUFFLE_READ_TIME ),
277
+ getMetricValue(shuffleRemoteBytesRead, TaskIndexNames .SHUFFLE_REMOTE_READS ),
278
+ getMetricValue(shuffleRemoteBytesReadToDisk,
279
+ TaskIndexNames .SHUFFLE_REMOTE_READS_TO_DISK ),
280
+ getMetricValue(shuffleLocalBytesRead, TaskIndexNames .SHUFFLE_LOCAL_READ ),
281
+ getMetricValue(shuffleRecordsRead, TaskIndexNames .SHUFFLE_READ_RECORDS )),
282
282
new ShuffleWriteMetrics (
283
- gerMetricValue (shuffleBytesWritten, TaskIndexNames .SHUFFLE_WRITE_SIZE ),
284
- gerMetricValue (shuffleWriteTime, TaskIndexNames .SHUFFLE_WRITE_TIME ),
285
- gerMetricValue (shuffleRecordsWritten, TaskIndexNames .SHUFFLE_WRITE_RECORDS ))))
283
+ getMetricValue (shuffleBytesWritten, TaskIndexNames .SHUFFLE_WRITE_SIZE ),
284
+ getMetricValue (shuffleWriteTime, TaskIndexNames .SHUFFLE_WRITE_TIME ),
285
+ getMetricValue (shuffleRecordsWritten, TaskIndexNames .SHUFFLE_WRITE_RECORDS ))))
286
286
} else {
287
287
None
288
288
}
@@ -314,9 +314,9 @@ private[spark] class TaskDataWrapper(
314
314
def schedulerDelay : Long = {
315
315
if (hasMetrics) {
316
316
AppStatusUtils .schedulerDelay(launchTime, resultFetchStart, duration,
317
- gerMetricValue (executorDeserializeTime, TaskIndexNames .DESER_TIME ),
318
- gerMetricValue (resultSerializationTime, TaskIndexNames .SER_TIME ),
319
- gerMetricValue (executorRunTime, TaskIndexNames .EXEC_RUN_TIME ))
317
+ getMetricValue (executorDeserializeTime, TaskIndexNames .DESER_TIME ),
318
+ getMetricValue (resultSerializationTime, TaskIndexNames .SER_TIME ),
319
+ getMetricValue (executorRunTime, TaskIndexNames .EXEC_RUN_TIME ))
320
320
} else {
321
321
- 1L
322
322
}
@@ -349,8 +349,8 @@ private[spark] class TaskDataWrapper(
349
349
@ JsonIgnore @ KVIndex (value = TaskIndexNames .SHUFFLE_TOTAL_READS , parent = TaskIndexNames .STAGE )
350
350
private def shuffleTotalReads : Long = {
351
351
if (hasMetrics) {
352
- gerMetricValue (shuffleLocalBytesRead, TaskIndexNames .SHUFFLE_LOCAL_READ ) +
353
- gerMetricValue (shuffleRemoteBytesRead, TaskIndexNames .SHUFFLE_REMOTE_READS )
352
+ getMetricValue (shuffleLocalBytesRead, TaskIndexNames .SHUFFLE_LOCAL_READ ) +
353
+ getMetricValue (shuffleRemoteBytesRead, TaskIndexNames .SHUFFLE_REMOTE_READS )
354
354
} else {
355
355
- 1L
356
356
}
@@ -359,8 +359,8 @@ private[spark] class TaskDataWrapper(
359
359
@ JsonIgnore @ KVIndex (value = TaskIndexNames .SHUFFLE_TOTAL_BLOCKS , parent = TaskIndexNames .STAGE )
360
360
private def shuffleTotalBlocks : Long = {
361
361
if (hasMetrics) {
362
- gerMetricValue (shuffleLocalBlocksFetched, TaskIndexNames .SHUFFLE_LOCAL_BLOCKS ) +
363
- gerMetricValue (shuffleRemoteBlocksFetched, TaskIndexNames .SHUFFLE_REMOTE_BLOCKS )
362
+ getMetricValue (shuffleLocalBlocksFetched, TaskIndexNames .SHUFFLE_LOCAL_BLOCKS ) +
363
+ getMetricValue (shuffleRemoteBlocksFetched, TaskIndexNames .SHUFFLE_REMOTE_BLOCKS )
364
364
} else {
365
365
- 1L
366
366
}
0 commit comments