Skip to content

Commit bec2068

Browse files
shahidki31Marcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-26260][CORE] For disk store tasks summary table should show only successful tasks summary
…sks metrics for disk store ### What changes were proposed in this pull request? After #23088 task Summary table in the stage page shows successful tasks metrics for lnMemory store. In this PR, it added for disk store also. ### Why are the changes needed? Now both InMemory and disk store will be consistent in showing the task summary table in the UI, if there are non successful tasks ### Does this PR introduce any user-facing change? no ### How was this patch tested? Added UT. Manually verified Test steps: 1. add the config in spark-defaults.conf -> **spark.history.store.path /tmp/store** 2. sbin/start-hitoryserver 3. bin/spark-shell 4. `sc.parallelize(1 to 1000, 2).map(x => throw new Exception("fail")).count` ![Screenshot 2019-11-14 at 3 51 39 AM](https://user-images.githubusercontent.com/23054875/68809546-268d2e80-0692-11ea-8b2c-bee767478135.png) Closes #26508 from shahidki31/task. Authored-by: shahid <shahidki31@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 29ebd93 commit bec2068

File tree

4 files changed

+234
-138
lines changed

4 files changed

+234
-138
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 23 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,6 @@ private[spark] class AppStatusStore(
136136
store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality
137137
}
138138

139-
// SPARK-26119: we only want to consider successful tasks when calculating the metrics summary,
140-
// but currently this is very expensive when using a disk store. So we only trigger the slower
141-
// code path when we know we have all data in memory. The following method checks whether all
142-
// the data will be in memory.
143-
private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined
144-
145139
/**
146140
* Calculates a summary of the task metrics for the given stage attempt, returning the
147141
* requested quantiles for the recorded metrics.
@@ -162,21 +156,11 @@ private[spark] class AppStatusStore(
162156
// cheaper for disk stores (avoids deserialization).
163157
val count = {
164158
Utils.tryWithResource(
165-
if (isInMemoryStore) {
166-
// For Live UI, we should count the tasks with status "SUCCESS" only.
167-
store.view(classOf[TaskDataWrapper])
168-
.parent(stageKey)
169-
.index(TaskIndexNames.STATUS)
170-
.first("SUCCESS")
171-
.last("SUCCESS")
172-
.closeableIterator()
173-
} else {
174-
store.view(classOf[TaskDataWrapper])
175-
.parent(stageKey)
176-
.index(TaskIndexNames.EXEC_RUN_TIME)
177-
.first(0L)
178-
.closeableIterator()
179-
}
159+
store.view(classOf[TaskDataWrapper])
160+
.parent(stageKey)
161+
.index(TaskIndexNames.EXEC_RUN_TIME)
162+
.first(0L)
163+
.closeableIterator()
180164
) { it =>
181165
var _count = 0L
182166
while (it.hasNext()) {
@@ -245,50 +229,30 @@ private[spark] class AppStatusStore(
245229
// stabilize once the stage finishes. It's also slow, especially with disk stores.
246230
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }
247231

248-
// TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119).
249-
// For InMemory case, it is efficient to find using the following code. But for diskStore case
250-
// we need an efficient solution to avoid deserialization time overhead. For that, we need to
251-
// rework on the way indexing works, so that we can index by specific metrics for successful
252-
// and failed tasks differently (would be tricky). Also would require changing the disk store
253-
// version (to invalidate old stores).
254232
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
255-
if (isInMemoryStore) {
256-
val quantileTasks = store.view(classOf[TaskDataWrapper])
233+
Utils.tryWithResource(
234+
store.view(classOf[TaskDataWrapper])
257235
.parent(stageKey)
258236
.index(index)
259237
.first(0L)
260-
.asScala
261-
.filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks
262-
.toIndexedSeq
263-
264-
indices.map { index =>
265-
fn(quantileTasks(index.toInt)).toDouble
266-
}.toIndexedSeq
267-
} else {
268-
Utils.tryWithResource(
269-
store.view(classOf[TaskDataWrapper])
270-
.parent(stageKey)
271-
.index(index)
272-
.first(0L)
273-
.closeableIterator()
274-
) { it =>
275-
var last = Double.NaN
276-
var currentIdx = -1L
277-
indices.map { idx =>
278-
if (idx == currentIdx) {
238+
.closeableIterator()
239+
) { it =>
240+
var last = Double.NaN
241+
var currentIdx = -1L
242+
indices.map { idx =>
243+
if (idx == currentIdx) {
244+
last
245+
} else {
246+
val diff = idx - currentIdx
247+
currentIdx = idx
248+
if (it.skip(diff - 1)) {
249+
last = fn(it.next()).toDouble
279250
last
280251
} else {
281-
val diff = idx - currentIdx
282-
currentIdx = idx
283-
if (it.skip(diff - 1)) {
284-
last = fn(it.next()).toDouble
285-
last
286-
} else {
287-
Double.NaN
288-
}
252+
Double.NaN
289253
}
290-
}.toIndexedSeq
291-
}
254+
}
255+
}.toIndexedSeq
292256
}
293257
}
294258

@@ -582,7 +546,7 @@ private[spark] class AppStatusStore(
582546

583547
private[spark] object AppStatusStore {
584548

585-
val CURRENT_VERSION = 1L
549+
val CURRENT_VERSION = 2L
586550

587551
/**
588552
* Create an in-memory store for a live application.

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 78 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,19 @@ private class LiveTask(
184184
info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis()))
185185
}
186186

187+
val hasMetrics = metrics.executorDeserializeTime >= 0
188+
189+
/**
190+
* SPARK-26260: For non successful tasks, store the metrics as negative to avoid
191+
* the calculation in the task summary. `toApi` method in the `TaskDataWrapper` will make
192+
* it actual value.
193+
*/
194+
val taskMetrics: v1.TaskMetrics = if (hasMetrics && !info.successful) {
195+
makeNegative(metrics)
196+
} else {
197+
metrics
198+
}
199+
187200
new TaskDataWrapper(
188201
info.taskId,
189202
info.index,
@@ -199,30 +212,31 @@ private class LiveTask(
199212
newAccumulatorInfos(info.accumulables),
200213
errorMessage,
201214

202-
metrics.executorDeserializeTime,
203-
metrics.executorDeserializeCpuTime,
204-
metrics.executorRunTime,
205-
metrics.executorCpuTime,
206-
metrics.resultSize,
207-
metrics.jvmGcTime,
208-
metrics.resultSerializationTime,
209-
metrics.memoryBytesSpilled,
210-
metrics.diskBytesSpilled,
211-
metrics.peakExecutionMemory,
212-
metrics.inputMetrics.bytesRead,
213-
metrics.inputMetrics.recordsRead,
214-
metrics.outputMetrics.bytesWritten,
215-
metrics.outputMetrics.recordsWritten,
216-
metrics.shuffleReadMetrics.remoteBlocksFetched,
217-
metrics.shuffleReadMetrics.localBlocksFetched,
218-
metrics.shuffleReadMetrics.fetchWaitTime,
219-
metrics.shuffleReadMetrics.remoteBytesRead,
220-
metrics.shuffleReadMetrics.remoteBytesReadToDisk,
221-
metrics.shuffleReadMetrics.localBytesRead,
222-
metrics.shuffleReadMetrics.recordsRead,
223-
metrics.shuffleWriteMetrics.bytesWritten,
224-
metrics.shuffleWriteMetrics.writeTime,
225-
metrics.shuffleWriteMetrics.recordsWritten,
215+
hasMetrics,
216+
taskMetrics.executorDeserializeTime,
217+
taskMetrics.executorDeserializeCpuTime,
218+
taskMetrics.executorRunTime,
219+
taskMetrics.executorCpuTime,
220+
taskMetrics.resultSize,
221+
taskMetrics.jvmGcTime,
222+
taskMetrics.resultSerializationTime,
223+
taskMetrics.memoryBytesSpilled,
224+
taskMetrics.diskBytesSpilled,
225+
taskMetrics.peakExecutionMemory,
226+
taskMetrics.inputMetrics.bytesRead,
227+
taskMetrics.inputMetrics.recordsRead,
228+
taskMetrics.outputMetrics.bytesWritten,
229+
taskMetrics.outputMetrics.recordsWritten,
230+
taskMetrics.shuffleReadMetrics.remoteBlocksFetched,
231+
taskMetrics.shuffleReadMetrics.localBlocksFetched,
232+
taskMetrics.shuffleReadMetrics.fetchWaitTime,
233+
taskMetrics.shuffleReadMetrics.remoteBytesRead,
234+
taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk,
235+
taskMetrics.shuffleReadMetrics.localBytesRead,
236+
taskMetrics.shuffleReadMetrics.recordsRead,
237+
taskMetrics.shuffleWriteMetrics.bytesWritten,
238+
taskMetrics.shuffleWriteMetrics.writeTime,
239+
taskMetrics.shuffleWriteMetrics.recordsWritten,
226240

227241
stageId,
228242
stageAttemptId)
@@ -710,6 +724,46 @@ private object LiveEntityHelpers {
710724
addMetrics(m1, m2, -1)
711725
}
712726

727+
/**
728+
* Convert all the metric values to negative as well as handle zero values.
729+
* This method assumes that all the metric values are greater than or equal to zero
730+
*/
731+
def makeNegative(m: v1.TaskMetrics): v1.TaskMetrics = {
732+
// To handle 0 metric value, add 1 and make the metric negative.
733+
// To recover actual value do `math.abs(metric + 1)`
734+
// Eg: if the metric values are (5, 3, 0, 1) => Updated metric values will be (-6, -4, -1, -2)
735+
// To get actual metric value, do math.abs(metric + 1) => (5, 3, 0, 1)
736+
def updateMetricValue(metric: Long): Long = {
737+
metric * -1L - 1L
738+
}
739+
740+
createMetrics(
741+
updateMetricValue(m.executorDeserializeTime),
742+
updateMetricValue(m.executorDeserializeCpuTime),
743+
updateMetricValue(m.executorRunTime),
744+
updateMetricValue(m.executorCpuTime),
745+
updateMetricValue(m.resultSize),
746+
updateMetricValue(m.jvmGcTime),
747+
updateMetricValue(m.resultSerializationTime),
748+
updateMetricValue(m.memoryBytesSpilled),
749+
updateMetricValue(m.diskBytesSpilled),
750+
updateMetricValue(m.peakExecutionMemory),
751+
updateMetricValue(m.inputMetrics.bytesRead),
752+
updateMetricValue(m.inputMetrics.recordsRead),
753+
updateMetricValue(m.outputMetrics.bytesWritten),
754+
updateMetricValue(m.outputMetrics.recordsWritten),
755+
updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched),
756+
updateMetricValue(m.shuffleReadMetrics.localBlocksFetched),
757+
updateMetricValue(m.shuffleReadMetrics.fetchWaitTime),
758+
updateMetricValue(m.shuffleReadMetrics.remoteBytesRead),
759+
updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk),
760+
updateMetricValue(m.shuffleReadMetrics.localBytesRead),
761+
updateMetricValue(m.shuffleReadMetrics.recordsRead),
762+
updateMetricValue(m.shuffleWriteMetrics.bytesWritten),
763+
updateMetricValue(m.shuffleWriteMetrics.writeTime),
764+
updateMetricValue(m.shuffleWriteMetrics.recordsWritten))
765+
}
766+
713767
private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = {
714768
createMetrics(
715769
m1.executorDeserializeTime + m2.executorDeserializeTime * mult,

core/src/main/scala/org/apache/spark/status/storeTypes.scala

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,13 @@ private[spark] class TaskDataWrapper(
177177
val accumulatorUpdates: Seq[AccumulableInfo],
178178
val errorMessage: Option[String],
179179

180+
val hasMetrics: Boolean,
180181
// The following is an exploded view of a TaskMetrics API object. This saves 5 objects
181-
// (= 80 bytes of Java object overhead) per instance of this wrapper. If the first value
182-
// (executorDeserializeTime) is -1L, it means the metrics for this task have not been
183-
// recorded.
182+
// (= 80 bytes of Java object overhead) per instance of this wrapper. Non successful
183+
// tasks' metrics will have negative values in `TaskDataWrapper`. `TaskData` will have
184+
// actual metric values. To recover the actual metric value from `TaskDataWrapper`,
185+
// need use `getMetricValue` method. If `hasMetrics` is false, it means the metrics
186+
// for this task have not been recorded.
184187
@KVIndexParam(value = TaskIndexNames.DESER_TIME, parent = TaskIndexNames.STAGE)
185188
val executorDeserializeTime: Long,
186189
@KVIndexParam(value = TaskIndexNames.DESER_CPU_TIME, parent = TaskIndexNames.STAGE)
@@ -233,39 +236,46 @@ private[spark] class TaskDataWrapper(
233236
val stageId: Int,
234237
val stageAttemptId: Int) {
235238

236-
def hasMetrics: Boolean = executorDeserializeTime >= 0
239+
// SPARK-26260: To handle non successful tasks metrics (Running, Failed, Killed).
240+
private def getMetricValue(metric: Long): Long = {
241+
if (status != "SUCCESS") {
242+
math.abs(metric + 1)
243+
} else {
244+
metric
245+
}
246+
}
237247

238248
def toApi: TaskData = {
239249
val metrics = if (hasMetrics) {
240250
Some(new TaskMetrics(
241-
executorDeserializeTime,
242-
executorDeserializeCpuTime,
243-
executorRunTime,
244-
executorCpuTime,
245-
resultSize,
246-
jvmGcTime,
247-
resultSerializationTime,
248-
memoryBytesSpilled,
249-
diskBytesSpilled,
250-
peakExecutionMemory,
251+
getMetricValue(executorDeserializeTime),
252+
getMetricValue(executorDeserializeCpuTime),
253+
getMetricValue(executorRunTime),
254+
getMetricValue(executorCpuTime),
255+
getMetricValue(resultSize),
256+
getMetricValue(jvmGcTime),
257+
getMetricValue(resultSerializationTime),
258+
getMetricValue(memoryBytesSpilled),
259+
getMetricValue(diskBytesSpilled),
260+
getMetricValue(peakExecutionMemory),
251261
new InputMetrics(
252-
inputBytesRead,
253-
inputRecordsRead),
262+
getMetricValue(inputBytesRead),
263+
getMetricValue(inputRecordsRead)),
254264
new OutputMetrics(
255-
outputBytesWritten,
256-
outputRecordsWritten),
265+
getMetricValue(outputBytesWritten),
266+
getMetricValue(outputRecordsWritten)),
257267
new ShuffleReadMetrics(
258-
shuffleRemoteBlocksFetched,
259-
shuffleLocalBlocksFetched,
260-
shuffleFetchWaitTime,
261-
shuffleRemoteBytesRead,
262-
shuffleRemoteBytesReadToDisk,
263-
shuffleLocalBytesRead,
264-
shuffleRecordsRead),
268+
getMetricValue(shuffleRemoteBlocksFetched),
269+
getMetricValue(shuffleLocalBlocksFetched),
270+
getMetricValue(shuffleFetchWaitTime),
271+
getMetricValue(shuffleRemoteBytesRead),
272+
getMetricValue(shuffleRemoteBytesReadToDisk),
273+
getMetricValue(shuffleLocalBytesRead),
274+
getMetricValue(shuffleRecordsRead)),
265275
new ShuffleWriteMetrics(
266-
shuffleBytesWritten,
267-
shuffleWriteTime,
268-
shuffleRecordsWritten)))
276+
getMetricValue(shuffleBytesWritten),
277+
getMetricValue(shuffleWriteTime),
278+
getMetricValue(shuffleRecordsWritten))))
269279
} else {
270280
None
271281
}
@@ -296,8 +306,10 @@ private[spark] class TaskDataWrapper(
296306
@JsonIgnore @KVIndex(value = TaskIndexNames.SCHEDULER_DELAY, parent = TaskIndexNames.STAGE)
297307
def schedulerDelay: Long = {
298308
if (hasMetrics) {
299-
AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration, executorDeserializeTime,
300-
resultSerializationTime, executorRunTime)
309+
AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration,
310+
getMetricValue(executorDeserializeTime),
311+
getMetricValue(resultSerializationTime),
312+
getMetricValue(executorRunTime))
301313
} else {
302314
-1L
303315
}
@@ -330,7 +342,7 @@ private[spark] class TaskDataWrapper(
330342
@JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE)
331343
private def shuffleTotalReads: Long = {
332344
if (hasMetrics) {
333-
shuffleLocalBytesRead + shuffleRemoteBytesRead
345+
getMetricValue(shuffleLocalBytesRead) + getMetricValue(shuffleRemoteBytesRead)
334346
} else {
335347
-1L
336348
}
@@ -339,7 +351,7 @@ private[spark] class TaskDataWrapper(
339351
@JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = TaskIndexNames.STAGE)
340352
private def shuffleTotalBlocks: Long = {
341353
if (hasMetrics) {
342-
shuffleLocalBlocksFetched + shuffleRemoteBlocksFetched
354+
getMetricValue(shuffleLocalBlocksFetched) + getMetricValue(shuffleRemoteBlocksFetched)
343355
} else {
344356
-1L
345357
}

0 commit comments

Comments
 (0)