Skip to content

Commit b0988c3

Browse files
committed
update
1 parent e8d0d14 commit b0988c3

File tree

3 files changed

+14
-13
lines changed

3 files changed

+14
-13
lines changed

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,9 @@ private class LiveTask(
190190
val handleZeros = mutable.HashSet[String]()
191191

192192
/**
193-
* For non successful tasks, store the metrics as negetive to avoid the calculation in the
194-
* task summary. `toApi` method in TaskDataWrapper will make it actual value.
193+
* SPARK-26260: For non successful tasks, store the metrics as negetive to avoid
194+
* the calculation in the task summary. `toApi` method in the `TaskDataWrapper` will make
195+
* it actual value.
195196
*/
196197
val taskMetrics: v1.TaskMetrics = if (hasMetrics && !info.successful) {
197198
makeNegative(metrics, handleZeros)

core/src/main/scala/org/apache/spark/status/storeTypes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ private[spark] class TaskDataWrapper(
181181
val errorMessage: Option[String],
182182

183183
val hasMetrics: Boolean,
184-
// Non successful metrics will have negative values in `TaskDataWrapper`.
184+
// Non successful metrics now will have negative values in `TaskDataWrapper`.
185185
// `TaskData` will have actual metric values. To recover the actual metric value
186186
// from `TaskDataWrapper`, need use `getMetricValue` method. parameter `handleZero` is to
187187
// check whether the index has zero metric value, which is used in the `getMetricValue`.
@@ -242,7 +242,7 @@ private[spark] class TaskDataWrapper(
242242
val stageId: Int,
243243
val stageAttemptId: Int) {
244244

245-
// To handle non successful tasks metrics (Running, Failed, Killed).
245+
// SPARK-26260: To handle non successful tasks metrics (Running, Failed, Killed).
246246
private def getMetricValue(metric: Long, index: String): Long = {
247247
if (handleZero(index)) {
248248
0L

core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,6 @@ class AppStatusStoreSuite extends SparkFunSuite {
133133
}
134134
}
135135

136-
test("task summary size for default metrics should be zero") {
137-
val store = new InMemoryStore()
138-
(0 until 5).foreach { _ => store.write(newTaskData(-1, status = "RUNNING")) }
139-
Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore =>
140-
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
141-
assert(summary.size === 0)
142-
}
143-
}
144-
145136
test("SPARK-26260: summary should contain successful tasks only when with LevelDB store") {
146137
val testDir = Utils.createTempDir()
147138
val diskStore = KVUtils.open(testDir, getClass().getName())
@@ -168,6 +159,15 @@ class AppStatusStoreSuite extends SparkFunSuite {
168159
Utils.deleteRecursively(testDir)
169160
}
170161

162+
test("SPARK-26260: task summary size for default metrics should be zero") {
163+
val store = new InMemoryStore()
164+
store.write(newTaskData(-1, status = "RUNNING"))
165+
Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore =>
166+
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
167+
assert(summary.size === 0)
168+
}
169+
}
170+
171171
private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {
172172
val store = new InMemoryStore()
173173
val values = (0 until count).map { i =>

0 commit comments

Comments
 (0)