Skip to content

[SPARK-22897][CORE]: Expose stageAttemptId in TaskContext #20082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

advancedxy
Copy link
Contributor

What changes were proposed in this pull request?

stageAttemptId added in TaskContext and corresponding construction modification

How was this patch tested?

Added a new test in TaskContextSuite, two cases are tested:

  1. Normal case without failure
  2. Exception case with resubmitted stages

Link to SPARK-22897

@advancedxy
Copy link
Contributor Author

cc @cloud-fan

@@ -150,6 +150,10 @@ abstract class TaskContext extends Serializable {
*/
def stageId(): Int

/**
* The attempt ID of the stage that this task belongs to.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we follow taskAttemptId and say more in the document?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I can add more doc.

But is it necessary? I think the field(function) name is already self-explanatory.

@jiangxb1987
Copy link
Contributor

Please explain in the JIRA/PR description about your use case that need to access the stageAttemptId in the executor - we must provide a very good reason to make such a change to a public interface.

@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Dec 26, 2017

Test build #85409 has finished for PR 20082 at commit 5753ee0.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@advancedxy
Copy link
Contributor Author

Please explain in the JIRA/PR description about your use case that need to access the stageAttemptId in the executor - we must provide a very good reason to make such a change to a public interface.

@jiangxb1987 quoted from SPARK-22897:

Currently, there's no easy way for Executor to detect a new stage is launched as stageAttemptId is missing.

In one of our use case, spark task needs to reset some internal state once the stage is changed(we don't want to reset the state for every task as there may be a lot of tasks).

IMHO, stageAttemptId is as important as stageId or taskAttemptId and should be provided to user.

As for compatibility, we should do our best to avoid any incompatibility in minor releases. However,
this change should be source and binary compatible with previous releases.

1. Update mima exclude
2. Update wording in TaskContext
3. Update JavaTaskContextCompileCheck as suggested in TaskContext note
ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter"),

// [SPARK-22897] Expose stageAttemptId in TaskContext
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.stageAttemptId")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is suggested by #12248

If not appropriate, please let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put it at the beginning of v23excludes?

@SparkQA
Copy link

SparkQA commented Dec 26, 2017

Test build #85410 has finished for PR 20082 at commit f02bc1e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 27, 2017

Test build #85420 has finished for PR 20082 at commit 0f86e95.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 27, 2017

Test build #85421 has finished for PR 20082 at commit 59e4a9c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// Check stage attemptIds that are resubmitted when task fails
val stageAttemptIdsWithFailedStage =
sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need repartition here, just sc.parallelize(Seq(1, 2, 3, 4), 1).mapPartitions {...}

sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ =>
val stageAttemptId = TaskContext.get().stageAttemptId()
if (stageAttemptId < 2) {
throw new FetchFailedException(null, 0, 0, 0, "Fake")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emmm... just throw an Exception is enough here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to repartition part.

I use FetchFailedException to explicitly trigger a stage resubmission. Otherwise, the task would be resubmitted in the same stage if IIRC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, right~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment to explain that FetchFailedException will trigger a new stage attempt, while a common Exception will only trigger a task retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

@SparkQA
Copy link

SparkQA commented Dec 27, 2017

Test build #85429 has finished for PR 20082 at commit 291bbbc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except one nit

@@ -42,6 +42,7 @@ import org.apache.spark.util._
*/
private[spark] class TaskContextImpl(
val stageId: Int,
val stageAttemptId: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add override. Since you are touching this file, could you also add override to stageId and partitionId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Would you tell me the difference or rationale?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's kind of a code style standard: add override if it is override.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK then.

/**
* An ID that is unique to the stage attempt that this task belongs to.
*/
def stageAttemptId(): Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should call it stageAttempNumber to be consistent with taskAttemptNumber. Also let's follow the comment of attemptNumber

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if we are defining stageAttemptId from scratch, I would go for stageAttemptNumber. However stageAttemptId are already used elsewhere in the codebase, Like in Task.scala. I think it's more important to be consistent.

However I could update the comment to reflect the attempt number part if you wish

Updating wording in TaskContext.stageAttemptId
@SparkQA
Copy link

SparkQA commented Dec 28, 2017

Test build #85457 has finished for PR 20082 at commit 72a3abf.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Dec 28, 2017

Test build #85474 has finished for PR 20082 at commit 72a3abf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@advancedxy
Copy link
Contributor Author

ping @cloud-fan @jiangxb1987 @zsxwing, I think it's ready for merging.

* times the stage has been attempted. The first stage attempt will be assigned stageAttemptId = 0
* , and subsequent attempts will increasing stageAttemptId one by one.
*/
def stageAttemptId(): Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that, internally we use stageAttemptId, and internally we call TaskContext.taskAttemptId taskId. However, for end users, they don't know the internal code, and they are more familiar with TaskContext. I think the naming should be consistent with the public API TaskContext, instead of internal code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no objection for either ‘id’ nor ‘number’, they are both reasonable.

I am on train now。 If no other input, I can rename it to ‘stageAttemptNumber’ since you insisted.

@SparkQA
Copy link

SparkQA commented Dec 29, 2017

Test build #85512 has finished for PR 20082 at commit 2581263.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 29, 2017

Test build #85513 has finished for PR 20082 at commit 9266cd8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@advancedxy
Copy link
Contributor Author

@cloud-fan Please take another look.

@@ -79,6 +79,7 @@ private[spark] abstract class Task[T](
SparkEnv.get.blockManager.registerTask(taskAttemptId)
context = new TaskContextImpl(
stageId,
stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much work we need to rename the internal stageAttemptId to stageAttemptNumber?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modification may not be too much (100+ occurrences in 20+ files), however it may break eventLog's JsonProtocol backward compatibility(not sure)..

@squito you may have more knowledge on this since you introduced stageAttemptId.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so stageAttemptId is already exposed in developer API, we can't change it.

@cloud-fan
Copy link
Contributor

LGTM

asfgit pushed a commit that referenced this pull request Jan 2, 2018
## What changes were proposed in this pull request?
stageAttemptId added in TaskContext and corresponding construction modification

## How was this patch tested?
Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE <advancedxy@gmail.com>

Closes #20082 from advancedxy/SPARK-22897.

(cherry picked from commit a6fc300)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

@asfgit asfgit closed this in a6fc300 Jan 2, 2018
@advancedxy advancedxy deleted the SPARK-22897 branch January 2, 2018 15:33
@zsxwing
Copy link
Member

zsxwing commented Jan 3, 2018

@advancedxy could you also submit a PR to add a def stageAttemptNumber = stageAttemptId to StageInfo and mark stageAttemptId as deprecated? It will make our public APIs more consistent.

@advancedxy
Copy link
Contributor Author

Of course.

@squito
Copy link
Contributor

squito commented Jan 5, 2018

sorry this is a little late, but lgtm too. agree with the points above about leaving the old name deprecated and moving to the new name

vanzin pushed a commit to vanzin/spark that referenced this pull request Jun 21, 2018
stageAttemptId added in TaskContext and corresponding construction modification

Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE <advancedxy@gmail.com>

Closes apache#20082 from advancedxy/SPARK-22897.

(cherry picked from commit a6fc300)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
tgravescs pushed a commit to tgravescs/spark that referenced this pull request Jun 22, 2018
stageAttemptId added in TaskContext and corresponding construction modification

Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE <advancedxy@gmail.com>

Closes apache#20082 from advancedxy/SPARK-22897.

Conflicts:
	project/MimaExcludes.scala
asfgit pushed a commit that referenced this pull request Jun 22, 2018
stageAttemptId added in TaskContext and corresponding construction modification

Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE <advancedxygmail.com>

Closes #20082 from advancedxy/SPARK-22897.

Conflicts:
	project/MimaExcludes.scala

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xianjin YE <advancedxy@gmail.com>
Author: Thomas Graves <tgraves@unharmedunarmed.corp.ne1.yahoo.com>

Closes #21609 from tgravescs/SPARK-22897.
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
stageAttemptId added in TaskContext and corresponding construction modification

Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE <advancedxygmail.com>

Closes apache#20082 from advancedxy/SPARK-22897.

Conflicts:
	project/MimaExcludes.scala

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xianjin YE <advancedxy@gmail.com>
Author: Thomas Graves <tgraves@unharmedunarmed.corp.ne1.yahoo.com>

Closes apache#21609 from tgravescs/SPARK-22897.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
stageAttemptId added in TaskContext and corresponding construction modification

Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE <advancedxy@gmail.com>

Closes apache#20082 from advancedxy/SPARK-22897.

(cherry picked from commit a6fc300)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
turboFei pushed a commit to turboFei/spark that referenced this pull request Aug 31, 2018
stageAttemptId added in TaskContext and corresponding construction modification

Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE <advancedxygmail.com>

Closes apache#20082 from advancedxy/SPARK-22897.

Conflicts:
	project/MimaExcludes.scala

(Please fill in changes proposed in this fix)

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xianjin YE <advancedxy@gmail.com>
Author: Thomas Graves <tgraves@unharmedunarmed.corp.ne1.yahoo.com>

Closes apache#21609 from tgravescs/SPARK-22897.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants