-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-28638][WebUI] Task summary should only contain successful tasks' metrics #25369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -136,6 +136,12 @@ private[spark] class AppStatusStore( | |||||||||
store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality | ||||||||||
} | ||||||||||
|
||||||||||
// SPARK-26119: we only want to consider successful tasks when calculating the metrics summary, | ||||||||||
// but currently this is very expensive when using a disk store. So we only trigger the slower | ||||||||||
// code path when we know we have all data in memory. The following method checks whether all | ||||||||||
// the data will be in memory. | ||||||||||
private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined | ||||||||||
|
||||||||||
/** | ||||||||||
* Calculates a summary of the task metrics for the given stage attempt, returning the | ||||||||||
* requested quantiles for the recorded metrics. | ||||||||||
|
@@ -156,7 +162,8 @@ private[spark] class AppStatusStore( | |||||||||
// cheaper for disk stores (avoids deserialization). | ||||||||||
val count = { | ||||||||||
Utils.tryWithResource( | ||||||||||
if (store.isInstanceOf[InMemoryStore]) { | ||||||||||
if (isInMemoryStore) { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After reading the comment in https://github.com/apache/spark/pull/23088/files#diff-3bd1667878f7bda9a56f95e93a80b475R233, I think it is on purpose that only count the "SUCCESS" task when with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @gengliangwang . History server uses I think you just need to add, Will test your code with all the scenarios. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shahidki31 Thanks for the suggestion. I am aware that #23088 is to follow the behavior of previous versions of spark. But I wonder if we can simply show the summary metrics for all the tasks instead of only the "SUCCESS" ones, as all the tasks are listed in the task table. By doing that should also make sense to users. The implementation will be simpler and we don't have to worry about the performance of the disk store. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know enough to evaluate this one, sorry. The code change itself looks plausible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gengliangwang If I understand correctly,
spark/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala Lines 263 to 265 in a59fdc4
So, reverting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So sounds like the current behavior is consistent with how Spark has behaved in the past, and this change is proposing a different approach. A quick look at the history shows the current behavior has been that in forever... Also, that pointed me at another thing that can be seen in the screenshots. The table header says "Summary Metrics for X Completed Tasks". So this change would be making that wrong... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I need to backtrack here a little. My bad, I think I know why I'm confused. I've been starting from the PR title, and it confused me a little bit. Could you change it so it describes what the change is doing? e.g. "Ignore non-succeeded tasks when calculating metric summaries." Let me re-review trying to ignore that and focusing just on the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vanzin Got it. I have updated the title. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be we need to add a test in the AppStatusStoreSuite? There all the stores are tested against InMemory store only, I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shahidki31 sure, I have added test cases. |
||||||||||
// For Live UI, we should count the tasks with status "SUCCESS" only. | ||||||||||
store.view(classOf[TaskDataWrapper]) | ||||||||||
.parent(stageKey) | ||||||||||
.index(TaskIndexNames.STATUS) | ||||||||||
|
@@ -245,7 +252,7 @@ private[spark] class AppStatusStore( | |||||||||
// and failed tasks differently (would be tricky). Also would require changing the disk store | ||||||||||
// version (to invalidate old stores). | ||||||||||
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { | ||||||||||
if (store.isInstanceOf[InMemoryStore]) { | ||||||||||
if (isInMemoryStore) { | ||||||||||
val quantileTasks = store.view(classOf[TaskDataWrapper]) | ||||||||||
.parent(stageKey) | ||||||||||
.index(index) | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check isn't whether it's an in-memory store, but whether you're running a live UI (vs. in-memory store in the SHS). Is that your intent? (The method name should match what you intend to check.)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked if it is live UI in the first commit.
As per the comment in https://github.com/apache/spark/pull/23088/files#diff-3bd1667878f7bda9a56f95e93a80b475R233 and #25369 (comment), I change it to checking if it is InMemoryStore/Live UI.
But later on, I don't like the idea that live UI is inconsistent with SHS. So I raise a question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I looked at this again ignoring the PR title and this makes sense. Sorry for flip-flopping here, but could you put this in a local variable in
taskSummary
with a comment explaining it? e.g.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have added comments