Skip to content

Commit 48d04f7

Browse files
gengliangwangMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-28638][WEBUI] Task summary should only contain successful tasks' metrics
## What changes were proposed in this pull request? Currently, on requesting summary metrics, cached data are returned if the current number of "SUCCESS" tasks is the same as the value in cached data. However, the number of "SUCCESS" tasks is wrong when there are running tasks. In `AppStatusStore`, the KVStore is `ElementTrackingStore`, instead of `InMemoryStore`. The value count is always the number of "SUCCESS" tasks + "RUNNING" tasks. Thus, even when the running tasks are finished, the out-of-update cached data is returned. This PR is to fix the code in getting the number of "SUCCESS" tasks. ## How was this patch tested? Test manually, run ``` sc.parallelize(1 to 160, 40).map(i => Thread.sleep(i*100)).collect() ``` and keep refreshing the stage page , we can see the task summary metrics is wrong. ### Before fix: ![image](https://user-images.githubusercontent.com/1097932/62560343-6a141780-b8af-11e9-8942-d88540659a93.png) ### After fix: ![image](https://user-images.githubusercontent.com/1097932/62560355-7009f880-b8af-11e9-8ba8-10c083a48d7b.png) Closes #25369 from gengliangwang/fixStagePage. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 6964128 commit 48d04f7

File tree

2 files changed

+30
-13
lines changed

2 files changed

+30
-13
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,12 @@ private[spark] class AppStatusStore(
136136
store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality
137137
}
138138

139+
// SPARK-26119: we only want to consider successful tasks when calculating the metrics summary,
140+
// but currently this is very expensive when using a disk store. So we only trigger the slower
141+
// code path when we know we have all data in memory. The following method checks whether all
142+
// the data will be in memory.
143+
private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined
144+
139145
/**
140146
* Calculates a summary of the task metrics for the given stage attempt, returning the
141147
* requested quantiles for the recorded metrics.
@@ -156,7 +162,8 @@ private[spark] class AppStatusStore(
156162
// cheaper for disk stores (avoids deserialization).
157163
val count = {
158164
Utils.tryWithResource(
159-
if (store.isInstanceOf[InMemoryStore]) {
165+
if (isInMemoryStore) {
166+
// For Live UI, we should count the tasks with status "SUCCESS" only.
160167
store.view(classOf[TaskDataWrapper])
161168
.parent(stageKey)
162169
.index(TaskIndexNames.STATUS)
@@ -245,7 +252,7 @@ private[spark] class AppStatusStore(
245252
// and failed tasks differently (would be tricky). Also would require changing the disk store
246253
// version (to invalidate old stores).
247254
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
248-
if (store.isInstanceOf[InMemoryStore]) {
255+
if (isInMemoryStore) {
249256
val quantileTasks = store.view(classOf[TaskDataWrapper])
250257
.parent(stageKey)
251258
.index(index)

core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.spark.status
1919

20-
import org.apache.spark.SparkFunSuite
21-
import org.apache.spark.status.api.v1.TaskMetricDistributions
20+
import org.apache.spark.{SparkConf, SparkFunSuite}
2221
import org.apache.spark.util.Distribution
2322
import org.apache.spark.util.kvstore._
2423

@@ -77,14 +76,23 @@ class AppStatusStoreSuite extends SparkFunSuite {
7776
assert(store.count(classOf[CachedQuantile]) === 2)
7877
}
7978

80-
test("only successfull task have taskSummary") {
79+
private def createLiveStore(inMemoryStore: InMemoryStore): AppStatusStore = {
80+
val conf = new SparkConf()
81+
val store = new ElementTrackingStore(inMemoryStore, conf)
82+
val listener = new AppStatusListener(store, conf, true, None)
83+
new AppStatusStore(store, listener = Some(listener))
84+
}
85+
86+
test("SPARK-28638: only successful tasks have taskSummary when with in memory kvstore") {
8187
val store = new InMemoryStore()
8288
(0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) }
83-
val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles)
84-
assert(appStore.size === 0)
89+
Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore =>
90+
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
91+
assert(summary.size === 0)
92+
}
8593
}
8694

87-
test("summary should contain task metrics of only successfull tasks") {
95+
test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") {
8896
val store = new InMemoryStore()
8997

9098
for (i <- 0 to 5) {
@@ -95,13 +103,15 @@ class AppStatusStoreSuite extends SparkFunSuite {
95103
}
96104
}
97105

98-
val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get
106+
Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore =>
107+
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get
99108

100-
val values = Array(0.0, 2.0, 4.0)
109+
val values = Array(0.0, 2.0, 4.0)
101110

102-
val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
103-
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
104-
assert(expected === actual)
111+
val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
112+
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
113+
assert(expected === actual)
114+
}
105115
}
106116
}
107117

0 commit comments

Comments
 (0)