-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-22897][CORE]: Expose stageAttemptId in TaskContext #20082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan |
@@ -150,6 +150,10 @@ abstract class TaskContext extends Serializable { | |||
*/ | |||
def stageId(): Int | |||
|
|||
/** | |||
* The attempt ID of the stage that this task belongs to. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we follow taskAttemptId
and say more in the document?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, I can add more doc.
But is it necessary? I think the field(function) name is already self-explanatory.
Please explain in the JIRA/PR description about your use case that need to access the stageAttemptId in the executor - we must provide a very good reason to make such a change to a public interface. |
ok to test |
Test build #85409 has finished for PR 20082 at commit
|
@jiangxb1987 quoted from SPARK-22897:
In one of our use case, spark task needs to reset some internal state once the stage is changed(we don't want to reset the state for every task as there may be a lot of tasks). IMHO, stageAttemptId is as important as stageId or taskAttemptId and should be provided to user. As for compatibility, we should do our best to avoid any incompatibility in minor releases. However, |
1. Update mima exclude 2. Update wording in TaskContext 3. Update JavaTaskContextCompileCheck as suggested in TaskContext note
project/MimaExcludes.scala
Outdated
ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter"), | ||
|
||
// [SPARK-22897] Expose stageAttemptId in TaskContext | ||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.stageAttemptId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is suggested by #12248
If not appropriate, please let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you put it at the beginning of v23excludes
?
Test build #85410 has finished for PR 20082 at commit
|
Test build #85420 has finished for PR 20082 at commit
|
0f86e95
to
59e4a9c
Compare
Test build #85421 has finished for PR 20082 at commit
|
|
||
// Check stage attemptIds that are resubmitted when task fails | ||
val stageAttemptIdsWithFailedStage = | ||
sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need repartition
here, just sc.parallelize(Seq(1, 2, 3, 4), 1).mapPartitions {...}
sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ => | ||
val stageAttemptId = TaskContext.get().stageAttemptId() | ||
if (stageAttemptId < 2) { | ||
throw new FetchFailedException(null, 0, 0, 0, "Fake") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emmm... just throw an Exception
is enough here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to repartition part.
I use FetchFailedException to explicitly trigger a stage resubmission. Otherwise, the task would be resubmitted in the same stage if IIRC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, right~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comment to explain that FetchFailedException
will trigger a new stage attempt, while a common Exception
will only trigger a task retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
Test build #85429 has finished for PR 20082 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one nit
@@ -42,6 +42,7 @@ import org.apache.spark.util._ | |||
*/ | |||
private[spark] class TaskContextImpl( | |||
val stageId: Int, | |||
val stageAttemptId: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add override
. Since you are touching this file, could you also add override
to stageId
and partitionId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
Would you tell me the difference or rationale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's kind of a code style standard: add override
if it is override.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK then.
/** | ||
* An ID that is unique to the stage attempt that this task belongs to. | ||
*/ | ||
def stageAttemptId(): Int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should call it stageAttempNumber
to be consistent with taskAttemptNumber
. Also let's follow the comment of attemptNumber
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, if we are defining stageAttemptId
from scratch, I would go for stageAttemptNumber
. However stageAttemptId
are already used elsewhere in the codebase, Like in Task.scala. I think it's more important to be consistent.
However I could update the comment to reflect the attempt number part if you wish
Updating wording in TaskContext.stageAttemptId
Test build #85457 has finished for PR 20082 at commit
|
retest this please |
Test build #85474 has finished for PR 20082 at commit
|
ping @cloud-fan @jiangxb1987 @zsxwing, I think it's ready for merging. |
* times the stage has been attempted. The first stage attempt will be assigned stageAttemptId = 0 | ||
* , and subsequent attempts will increasing stageAttemptId one by one. | ||
*/ | ||
def stageAttemptId(): Int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that, internally we use stageAttemptId
, and internally we call TaskContext.taskAttemptId
taskId
. However, for end users, they don't know the internal code, and they are more familiar with TaskContext
. I think the naming should be consistent with the public API TaskContext
, instead of internal code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no objection for either ‘id’ nor ‘number’, they are both reasonable.
I am on train now。 If no other input, I can rename it to ‘stageAttemptNumber’ since you insisted.
2581263
to
9266cd8
Compare
Test build #85512 has finished for PR 20082 at commit
|
Test build #85513 has finished for PR 20082 at commit
|
@cloud-fan Please take another look. |
@@ -79,6 +79,7 @@ private[spark] abstract class Task[T]( | |||
SparkEnv.get.blockManager.registerTask(taskAttemptId) | |||
context = new TaskContextImpl( | |||
stageId, | |||
stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much work we need to rename the internal stageAttemptId
to stageAttemptNumber
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The modification may not be too much (100+ occurrences in 20+ files), however it may break eventLog's JsonProtocol backward compatibility(not sure)..
@squito you may have more knowledge on this since you introduced stageAttemptId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, so stageAttemptId
is already exposed in developer API, we can't change it.
LGTM |
## What changes were proposed in this pull request? stageAttemptId added in TaskContext and corresponding construction modification ## How was this patch tested? Added a new test in TaskContextSuite, two cases are tested: 1. Normal case without failure 2. Exception case with resubmitted stages Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897) Author: Xianjin YE <advancedxy@gmail.com> Closes #20082 from advancedxy/SPARK-22897. (cherry picked from commit a6fc300) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
thanks, merging to master/2.3! |
@advancedxy could you also submit a PR to add a |
Of course. |
sorry this is a little late, but lgtm too. agree with the points above about leaving the old name deprecated and moving to the new name |
stageAttemptId added in TaskContext and corresponding construction modification Added a new test in TaskContextSuite, two cases are tested: 1. Normal case without failure 2. Exception case with resubmitted stages Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897) Author: Xianjin YE <advancedxy@gmail.com> Closes apache#20082 from advancedxy/SPARK-22897. (cherry picked from commit a6fc300) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
stageAttemptId added in TaskContext and corresponding construction modification Added a new test in TaskContextSuite, two cases are tested: 1. Normal case without failure 2. Exception case with resubmitted stages Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897) Author: Xianjin YE <advancedxy@gmail.com> Closes apache#20082 from advancedxy/SPARK-22897. Conflicts: project/MimaExcludes.scala
stageAttemptId added in TaskContext and corresponding construction modification Added a new test in TaskContextSuite, two cases are tested: 1. Normal case without failure 2. Exception case with resubmitted stages Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897) Author: Xianjin YE <advancedxygmail.com> Closes #20082 from advancedxy/SPARK-22897. Conflicts: project/MimaExcludes.scala ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Xianjin YE <advancedxy@gmail.com> Author: Thomas Graves <tgraves@unharmedunarmed.corp.ne1.yahoo.com> Closes #21609 from tgravescs/SPARK-22897.
stageAttemptId added in TaskContext and corresponding construction modification Added a new test in TaskContextSuite, two cases are tested: 1. Normal case without failure 2. Exception case with resubmitted stages Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897) Author: Xianjin YE <advancedxygmail.com> Closes apache#20082 from advancedxy/SPARK-22897. Conflicts: project/MimaExcludes.scala ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Xianjin YE <advancedxy@gmail.com> Author: Thomas Graves <tgraves@unharmedunarmed.corp.ne1.yahoo.com> Closes apache#21609 from tgravescs/SPARK-22897.
stageAttemptId added in TaskContext and corresponding construction modification Added a new test in TaskContextSuite, two cases are tested: 1. Normal case without failure 2. Exception case with resubmitted stages Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897) Author: Xianjin YE <advancedxy@gmail.com> Closes apache#20082 from advancedxy/SPARK-22897. (cherry picked from commit a6fc300) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
stageAttemptId added in TaskContext and corresponding construction modification Added a new test in TaskContextSuite, two cases are tested: 1. Normal case without failure 2. Exception case with resubmitted stages Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897) Author: Xianjin YE <advancedxygmail.com> Closes apache#20082 from advancedxy/SPARK-22897. Conflicts: project/MimaExcludes.scala (Please fill in changes proposed in this fix) (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Xianjin YE <advancedxy@gmail.com> Author: Thomas Graves <tgraves@unharmedunarmed.corp.ne1.yahoo.com> Closes apache#21609 from tgravescs/SPARK-22897.
What changes were proposed in this pull request?
stageAttemptId added in TaskContext and corresponding construction modification
How was this patch tested?
Added a new test in TaskContextSuite, two cases are tested:
Link to SPARK-22897