-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-37831][CORE] add task partition id in TaskInfo and Task Metrics #35185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6b28643 to
12ce2fc
Compare
|
|
|
+CC @tgravescs, @Ngone51 |
|
The main purpose of this issue is to track the task metrics of a specific RDD partition, espetially for input/shuffle reads or output/shuffle writes. |
|
FWIW, there's a bug on updating the status in the check (#35179 (comment)). The tests in this PR looks passed (https://github.com/stczwd/spark/runs/4799251925). |
|
yeah generally index = partition id on the first execution but doesn't work on retries. I remember running into this a few times but was never a huge issue, seems like a good thing to add |
|
@tgravescs Agree, for the first stage attempt this should be fine. An interesting question would be how to handle this for skewed joins - thoughts @cloud-fan ? Tagged folks since I wanted to make sure I was not missing something here :-) |
|
This does look like an important missing functionality, thanks for identifying and working on it @stczwd ! Given size of the PR, I will try to circle back to it over the weekend. |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took an initial pass through the PR and added some comments - overall looks good.
We would need to make sure that skew join and partition coalescing in SQL interact well with this change.
| index, | ||
| attempt, | ||
| partitionId, | ||
| new Date(launchTime), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+CC @thejdeep, please take a look to make sure there are no backwardly incompatible changes.
Existing event files and/or level db's should be readable after this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have test it with event files, leveldb and rocksdb, and it is ok with rocksdb and event files, which retruns "partitionId" : -1,. But when testing leveldb, we get "partitionId" : 0,, which is different from what we expected.
BTW, the test scenario is generating eventLog and leveldb/rocksdb with old spark-client and reading it on new spark-client which has this pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to explore how we can default the value to -1 if missing.
Else cleaning up all level-db would be a prerequisite to deploying this.
I am not sure what is the cleanest way to enforce incompatible leveldb format changes - IIRC you had made some changes recently which require regen, right @dongjoon-hyun ? Any suggestions ? (IIRC we did not bump up the store version)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @dongjoon-hyun
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun Any point bout this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you double check again? @jackylee-ch .
@Ngone51 Thanks for your double check. It works fun while reading old eventLog json files with new spark-client. The recurring scenario of this problem is that using the new spark-client read old leveldb data.
"7" : {
"taskId" : 7,
"index" : 1,
"attempt" : 0,
"partitionId" : 0,
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you give the reproduce steps? @jackylee-ch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Firstly, use old spark-client to generate eventLog and leveldb with these configs;
spark.history.fs.logDirectory /tmp/spark-client/spark-old/eventLog
spark.history.store.path /tmp/spark-client/spark-old/kvstore
spark.history.store.hybridStore.diskBackend LEVELDB
Then, use new spark-client to read eventLog and level db with the configs in step 1;
These are the reproduce steps and configs that I have used. @Ngone51
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have reproduced the issue with the above steps. And now I see where the problem is. Basically, the fix in JsonProtocol only solves the compatible issue when reading event files. But, when reading from LeveDB, there's no compatible conversion for it. I tried to set the partitionId of TaskDataWrapper with the default value -1 and seems to work. Could you double-check? @jackylee-ch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 Thanks for your advise. It works fun with default value. I will add the default value -1 and some comments to it.
| "attempt" : 1, | ||
| "partitionId": -1, | ||
| "launchTime" : "2018-01-09T10:21:18.943GMT", | ||
| "duration" : 16, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we expect this to default to -1, revert changes to this (and similar other) file ?
Similarly for Partition Id = -1 from json files as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These json file is generated by rest api. Unless we do not want to show partitionId with history data, this still needs to be modified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to keeping it as-is.
We need to validate that missing value defaults to -1 (for both json and level db) - to test upgrade case (this is dependent on Dongjoon's suggestions above on how to mitigate the issue though).
Thanks for you reply. I have test partition coalescing in SQL interact, it works well with this change. |
What I want @cloud-fan, @dongjoon-hyun, etc who are more familiar with SQL to look at is - given a single partition gets computed by multiple tasks (for skew), or multiple partitions are getting computed by single task (for coalascing) what is the expectation (between the 'original' reducer stage, and executed reducer stage) Also whether this is compatible with future evolution of sql. |
|
kindly ping @cloud-fan @HyukjinKwon @dongjoon-hyun @rdblue |
Ngone51
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good to me.
|
kindly remind @cloud-fan @HyukjinKwon @dongjoon-hyun |
|
It would be good to make sure we test backwards compatibility on the history server. ie use the old history server to read newer event log with this change. |
Thanks for your attension. Yeap, we have already test it with old history server and new server while using a server generate eventLog and read it from another server, they all works fine with this change. Besides, we have already used it in our environment, which has different versions of servers, especially spark2.x. |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of minor comments.
I wanted to confirm whether we have tests where task index != partitionId - for example stage resubmission case.
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
Show resolved
Hide resolved
Yes, we have already test this. Here ia a simple example for stage recomputation with FetchFailed error. |
ping @dongjoon-hyun. Could you take a look at the question above? |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good to me.
Would like to understand how to handle the format change impact based on @dongjoon-hyun's experiences though.
+CC @Ngone51, @cloud-fan as well.
@Ngone51 @mridulm Since the default value can be set, the previous problem does not exist now. Can this pr be merged? |
|
Any more question about this pr? Can this pr merge now? |
|
cc @cloud-fan @mridulm @Ngone51 @HyukjinKwon @MaxGekk GA has passed, can we merge this pr to master/3.3.0? |
|
thanks, merging to master/3.3! |
### Why are the changes needed?
There is no partition id in the current task metrics. It is difficult to track the task metrics of a specific partition or the stage metrics of processing data, especially when the stage was retried.
```
class TaskData private[spark](
val taskId: Long,
val index: Int,
val attempt: Int,
val launchTime: Date,
val resultFetchStart: Option[Date],
JsonDeserialize(contentAs = classOf[JLong])
val duration: Option[Long],
val executorId: String,
val host: String,
val status: String,
val taskLocality: String,
val speculative: Boolean,
val accumulatorUpdates: Seq[AccumulableInfo],
val errorMessage: Option[String] = None,
val taskMetrics: Option[TaskMetrics] = None,
val executorLogs: Map[String, String],
val schedulerDelay: Long,
val gettingResultTime: Long)
```
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
add new tests
Closes #35185 from jackylee-ch/SPARK-37831.
Lead-authored-by: stczwd <qcsd2011@163.com>
Co-authored-by: Jacky Lee <qcsd2011@gmail.com>
Co-authored-by: jackylee-ch <lijunqing@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 13cfc5b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
|
Thanks all. |
…quals method in JsonProtocolSuite ### What changes were proposed in this pull request? In #35185 , task partition id was added in taskInfo. And, JsonProtocolSuite#assertEquals about TaskInfo doesn't have partitionId. ### Why are the changes needed? Should assert partitionId equals or not. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No need to add unit test. Closes #37081 from dcoliversun/SPARK-39676. Authored-by: Qian.Sun <qian.sun2020@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…quals method in JsonProtocolSuite ### What changes were proposed in this pull request? In #35185 , task partition id was added in taskInfo. And, JsonProtocolSuite#assertEquals about TaskInfo doesn't have partitionId. ### Why are the changes needed? Should assert partitionId equals or not. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No need to add unit test. Closes #37081 from dcoliversun/SPARK-39676. Authored-by: Qian.Sun <qian.sun2020@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Why are the changes needed?
There is no partition id in the current task metrics. It is difficult to track the task metrics of a specific partition or the stage metrics of processing data, especially when the stage was retried.
Does this PR introduce any user-facing change?
no
How was this patch tested?
add new tests