Skip to content

[SPARK-28638][WebUI] Task summary should only contain successful tasks' metrics #25369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ private[spark] class AppStatusStore(
store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality
}

// SPARK-26119: we only want to consider successful tasks when calculating the metrics summary,
// but currently this is very expensive when using a disk store. So we only trigger the slower
// code path when we know we have all data in memory. The following method checks whether all
// the data will be in memory.
private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check isn't whether it's an in-memory store, but whether you're running a live UI (vs. in-memory store in the SHS). Is that your intent? (The method name should match what you intend to check.)

Copy link
Member Author

@gengliangwang gengliangwang Aug 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked if it is live UI in the first commit.
As per the comment in https://github.com/apache/spark/pull/23088/files#diff-3bd1667878f7bda9a56f95e93a80b475R233 and #25369 (comment), I change it to checking if it is InMemoryStore/Live UI.
But later on, I don't like the idea that live UI is inconsistent with SHS. So I raise a question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I looked at this again ignoring the PR title and this makes sense. Sorry for flip-flopping here, but could you put this in a local variable in taskSummary with a comment explaining it? e.g.

// SPARK-26119: we only want to consider successful tasks when calculating the metrics summary,
// but currently this is very expensive when using a disk store. So we only trigger the slower code
// path when we know we have all data in memory. This check is an approximation of when the know
// that the data will be in memory.
val isInMemoryStore = store.isInstanceOf[InMemoryStore] || listener.isDefined

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have added comments


/**
* Calculates a summary of the task metrics for the given stage attempt, returning the
* requested quantiles for the recorded metrics.
Expand All @@ -156,7 +162,8 @@ private[spark] class AppStatusStore(
// cheaper for disk stores (avoids deserialization).
val count = {
Utils.tryWithResource(
if (store.isInstanceOf[InMemoryStore]) {
if (isInMemoryStore) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the comment in https://github.com/apache/spark/pull/23088/files#diff-3bd1667878f7bda9a56f95e93a80b475R233, I think it is on purpose that only count the "SUCCESS" task when with InMemoryStore.
cc @shahidki31

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gengliangwang . History server uses InMemory store by default. For Disk store case, this isn't an optimal way for finding success task. I am yet to raise a PR for supporting for Disk store case.

I think you just need to add,
isInMemoryStore: Boolean = listener.isDefined || store.isInstanceOf[InMemoryStore]

Will test your code with all the scenarios.
Any case, this would be temporary as we need to support for Diskstore also.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shahidki31 Thanks for the suggestion.

I am aware that #23088 is to follow the behavior of previous versions of spark. But I wonder if we can simply show the summary metrics for all the tasks instead of only the "SUCCESS" ones, as all the tasks are listed in the task table. By doing that should also make sense to users. The implementation will be simpler and we don't have to worry about the performance of the disk store.
Also cc @vanzin @srowen

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know enough to evaluate this one, sorry. The code change itself looks plausible.

Copy link
Contributor

@shahidki31 shahidki31 Aug 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang If I understand correctly, #23008 #23088 is actually fixing this issue. right ?(At least history server case).
Because, count is always filtering out the running tasks, as ExecutorRunTime will be defined for only finished tasks. But scan tasks contains all the tasks, including running tasks.

.index(TaskIndexNames.EXEC_RUN_TIME)

.parent(stageKey)
.index(index)
.first(0L)

So, reverting 23008 23088 and hence this PR, would not fix the issue? Please correct me if I am wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So sounds like the current behavior is consistent with how Spark has behaved in the past, and this change is proposing a different approach.

A quick look at the history shows the current behavior has been that in forever...

Also, that pointed me at another thing that can be seen in the screenshots. The table header says "Summary Metrics for X Completed Tasks". So this change would be making that wrong...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I need to backtrack here a little. My bad, I think I know why I'm confused.

I've been starting from the PR title, and it confused me a little bit. Could you change it so it describes what the change is doing? e.g. "Ignore non-succeeded tasks when calculating metric summaries."

Let me re-review trying to ignore that and focusing just on the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin Got it. I have updated the title.
The PR itself is about mismatching the kvstore, and leading to the wrong cache for task summary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we need to add a test in the AppStatusStoreSuite? There all the stores are tested against InMemory store only, I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shahidki31 sure, I have added test cases.

// For Live UI, we should count the tasks with status "SUCCESS" only.
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.STATUS)
Expand Down Expand Up @@ -245,7 +252,7 @@ private[spark] class AppStatusStore(
// and failed tasks differently (would be tricky). Also would require changing the disk store
// version (to invalidate old stores).
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
if (store.isInstanceOf[InMemoryStore]) {
if (isInMemoryStore) {
val quantileTasks = store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.status

import org.apache.spark.SparkFunSuite
import org.apache.spark.status.api.v1.TaskMetricDistributions
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.util.Distribution
import org.apache.spark.util.kvstore._

Expand Down Expand Up @@ -77,14 +76,23 @@ class AppStatusStoreSuite extends SparkFunSuite {
assert(store.count(classOf[CachedQuantile]) === 2)
}

test("only successfull task have taskSummary") {
private def createLiveStore(inMemoryStore: InMemoryStore): AppStatusStore = {
val conf = new SparkConf()
val store = new ElementTrackingStore(inMemoryStore, conf)
val listener = new AppStatusListener(store, conf, true, None)
new AppStatusStore(store, listener = Some(listener))
}

test("SPARK-28638: only successful tasks have taskSummary when with in memory kvstore") {
val store = new InMemoryStore()
(0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) }
val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles)
assert(appStore.size === 0)
Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore =>
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
assert(summary.size === 0)
}
}

test("summary should contain task metrics of only successfull tasks") {
test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") {
val store = new InMemoryStore()

for (i <- 0 to 5) {
Expand All @@ -95,13 +103,15 @@ class AppStatusStoreSuite extends SparkFunSuite {
}
}

val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get
Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore =>
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get

val values = Array(0.0, 2.0, 4.0)
val values = Array(0.0, 2.0, 4.0)

val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
assert(expected === actual)
val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
assert(expected === actual)
}
}
}

Expand Down