Skip to content

[SPARK-24589][core] Correctly identify tasks in output commit coordinator. #21577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from

Conversation

vanzin
Copy link
Contributor

@vanzin vanzin commented Jun 15, 2018

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

…ator.

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task if allowed to commit the output in the above case.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.
@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91949 has finished for PR 21577 at commit 09e5d15.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 16, 2018

Test build #91950 has finished for PR 21577 at commit d471b74.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

this was along the lines of what I was thinking as well. Will do a full review later.
Just curious if you were able to create a test to actually reproduce it?

From the other PR:

and data source v2 API assumes (job id, partition id, task attemp id) can uniquely define a write task, even counting the failure cases.

Are there other docs that need to be updated for v2 datasource api? @rdblue @cloud-fan

@vanzin
Copy link
Contributor Author

vanzin commented Jun 18, 2018

I think Ryan's change might still be good to introduce (i.e. a change that replaces the attempt id in that code with something a little more unique), regardless of any fix here.

The unit tests I added artificially re-creates the calls that would lead to the situation, but I haven't tried to create a test case that would run things through the scheduler.

@@ -81,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber)
throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also include stage attempt number in the exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Was trying to minimize changes in the first version, for testing.

@cloud-fan
Copy link
Contributor

Are there other docs that need to be updated for v2 datasource api?

Yes we need, but this can be done in a different PR

@cloud-fan
Copy link
Contributor

thanks! the fix LGTM

@vanzin
Copy link
Contributor Author

vanzin commented Jun 18, 2018

I'll try to create a test to exercise this in a real job (aside from the exception changes @cloud-fan suggested), but wouldn't hold my breath.

Rename the field to match what it actually is; except for the JSON-serialized
version, which for backwards compatibility still uses "job" instead of "stage".
state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber)
true
} else {
logDebug(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to include the stage attempt in the log messages as well.

@vanzin vanzin changed the title [WIP] [SPARK-24552][core] Correctly identify tasks in output commit coordinator. [SPARK-24552][core] Correctly identify tasks in output commit coordinator. Jun 18, 2018
@vanzin
Copy link
Contributor Author

vanzin commented Jun 18, 2018

I tried to create a test based on actually running a job, but I'd have to do a lot of hacking to control what the result stage does, and it was starting to feel not much better than the unit test I added here already, so I gave up.

@SparkQA
Copy link

SparkQA commented Jun 18, 2018

Test build #92039 has finished for PR 21577 at commit a2f4c1b.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
stageState.authorizedCommitters(partition) = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: why not use Option[TaskIdentifier] and None here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less memory usage, at least. Not sure what advantage using Option would bring here.

s"attempt: $attemptNumber")
case otherReason =>
case _: TaskCommitDenied =>
logInfo(s"Task was denied committing, stage: $stage / $stageAttempt, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should this be s"$stage.$stageAttempt"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks better and saves some space, so will do.

@@ -399,7 +399,8 @@ private[spark] object JsonProtocol {
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
("Accumulator Updates" -> accumUpdates)
case taskCommitDenied: TaskCommitDenied =>
("Job ID" -> taskCommitDenied.jobID) ~
("Job ID" -> taskCommitDenied.stageID) ~
("Job Attempt Number" -> taskCommitDenied.stageAttempt) ~
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this use "Job" and not "Stage"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, will this affect the compatibility of the history server files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the new property, I'm just following what the old property says, even though it's wrong. I think having Job ID and Stage Attempt Number would just be even more confusing...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it shouldn't affect compatibility. Given the code, even an old history server would be able to read these new log files.

@rdblue
Copy link
Contributor

rdblue commented Jun 18, 2018

+1. This fixes the commit coordinator problem where two separate tasks can be authorized. That case could lead to duplicate data (if, for example, both tasks generated unique file names using a random UUID).

However, this doesn't address the problem I hit in practice, where a file was created twice and deleted once because the same task attempt number was both allowed to commit by the coordinator and denied commit by the coordinator (after the stage had finished).

We still need the solution proposed in #21558 for the v2 API. But that's more of a v2 API problem because that API makes the guarantee that implementations can rely on the attempt ID.

@@ -109,20 +116,21 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
* the maximum possible value of `context.partitionId`).
*/
private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized {
private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized {
stageStates(stage) = new StageState(maxPartitionId + 1)
Copy link
Contributor

@mridulm mridulm Jun 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My memory is a bit rusty here, but are we changing the semantics of which task can commit here ?
Couple of queries:

  • Are we allowing task from a previous stage attempt to commit for current stage attempt ? (after previous stage attempt has failed/finished).
  • Based on TaskIdentifier above, I this yes ?
    • If no, should we check and reject commit requests from tasks from 'older' stage when the current stage attempt is different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the semantics are changing. It's always been racy, in that either of the concurrent tasks from different stage attempts may succeed first. And I'm almost sure the assumption is that both task attempts are equivalent (i.e. the output is deterministic or at least should be), so it should be fine for either to be committed.

The problem is that without this change the coordinator would allow both attempts to commit, and that is kinda bad.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two cases here (both not handled in existing/earlier code).

Handled in PR:

  • Stage S1 attempt A1 launched.
  • Tasks T1_1 launched for partition P1
  • A1 fails
  • Stage S1 attempt A2 launched.
  • Tasks T1_2 for partition P1 launched.
  • T1_1 finishes, and is allowed to commit.

IMO not handled in PR:

  • Stage S1 attempt A1 launched.
  • Tasks T1_1.1 launched for partition P1
  • Tasks T1_1.2 launched for partition P1 (speculative)
  • Task T1_1.1 committed.
  • A1 fails
  • Stage S1 attempt A2 launched for some other pending partitions.
  • Tasks T1_1.2 wants to commit.

T1_1.2 will be allowed to commit.
Now we have two tasks for same partition successfully committing.

Did I miss something here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T1_1.2 will not be allowed, it has a different task attempt number.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read @vanzin's PR right, T1_1.2 will be allowed to commit - since there is a stageEnd + stageStart in between (which clear the existing stage state).

Copy link
Contributor Author

@vanzin vanzin Jun 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this can happen. The problem is that with the current way it's used, the output committer forgets the commit status between stage retries. I think the right thing would be for the committer to keep the stage-related state until the scheduler is done with all its attempts.

whether it will generate corrupted data when the commit process of T1_1.1 didn't finish

I don't think that's the problem. The problem is that if both the initial task and the speculative task finish successfully, but across stage attempt barriers (so the output committer is "reset" in between), both will be allowed to commit, so you get duplicate data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in scenario 2, once the first task finishes and is committed, the taskset manager will kill the speculative task T1_1.2. But since it sends an async message to kill the task, the task could actually try to commit after another task fails and causes the stage to remove itself from the output commit coordinator and after it starts another stage attempt. So it could actually end up committed the task output for T1_1.2. I'm not sure this case by itself is a problem though since if it actually committed T1_1.1 and T1_1.2 is allowed to commit, they should have the same output and commitJob would handle in at least most cases. The caveat there though would be if since T1_1.1 was committed, the second stage attempt could finish and call commitJob while T1_1.2 is committing since spark thinks it doesn't need to wait for T1_1.2. Anyway this seems very unlikely but we should protect against it.

There is another case though here where T1_1.1 could have just asked to be committed, but not yet committed, then if it gets delayed committing, the new stage attempt starts and T1_1.2 asks if it could commit and is granted, so then both try to commit at the same time causing corruption.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the right thing would be for the committer to keep the stage-related state until the scheduler is done with all its attempts.

We should change the DAGScheduler a little bit that, if a stage is killed and going to be re-tried, do not clear the stage states in output coordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I'm going down that path. I just want to add a proper test to make sure the behavior is correct and that's a little bit tricky.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree @vanzin, @cloud-fan. We should remove the stage info only after the stage is done.

@vanzin
Copy link
Contributor Author

vanzin commented Jun 18, 2018

We still need the solution proposed in #21558 for the v2 API

Should we have a separate bug for these then? I just piggybacked on the bug you filed, but if they're separate issues, even if complementary, might be better to separate them.

@vanzin vanzin changed the title [SPARK-24552][core] Correctly identify tasks in output commit coordinator. [SPARK-24589][core] Correctly identify tasks in output commit coordinator. Jun 19, 2018
@vanzin
Copy link
Contributor Author

vanzin commented Jun 19, 2018

FYI I plan to fix the mima issue later today (still fighting with 2.1 builds). Haven't decided whether to revert the change or just add excludes... probably the latter since it's a developer api.

@tgravescs
Copy link
Contributor

tgravescs commented Jun 19, 2018

So I think the commit/delete thing is also an issue for existing v1 and hadoop committers as well. So this doesn't fully solve the problem. spark uses a file format like (HadoopMapReduceWriteConfigUtil/HadoopMapRedWriteConfigUtil):

{date}_{rddid}_{m/r}_{partitionid}_{task attempt number}

I believe the same fix as the v2 would work using the taskAttemptId instead of the attemptNumber.

In the case we have the stage failure and a second stage attempt the task attempt number could be the same and thus both tasks write to the same place. If one of them fails or is told not to commit it could delete the output which is being used by both.

Need to think through all the scenarios to make sure its covered.

@jiangxb1987
Copy link
Contributor

This in general looks good, IMO we shall focus on fixing the output commit coordinator issue in this PR, and discuss the data source issue in a separated thread.
I'm OOO this week but will still look into more detail on this issue.

@tgravescs
Copy link
Contributor

I'm fine with separating them but we need a jira or need to update the v2 jira to handle all cases

@tgravescs
Copy link
Contributor

tgravescs commented Jun 20, 2018

Ah, right, d'oh. I just checked about whether stages register with the coordinator, and saw the duplicate registration for the resubmitted map stage.

Yeah I noticed that to but I think we should perhaps file separate jira and only do that in 2.4 and maybe 2.3.2 just to limit changes for now

@vanzin
Copy link
Contributor Author

vanzin commented Jun 20, 2018

Sounds good to me (although I'm trying the change locally and unit tests are so far happy).

@vanzin
Copy link
Contributor Author

vanzin commented Jun 20, 2018

I filed SPARK-24611 to track some enhancements to this part of the code that have been discussed here. Of those, I'd consider the "use task IDs instead of TaskIdentifier" as something we could potentially do here, but at the same time I don't really want to delay this patch too much.

@vanzin
Copy link
Contributor Author

vanzin commented Jun 20, 2018

I pushed the change for that in: vanzin@e6a862e

In case anyone wants to take a look.

@SparkQA
Copy link

SparkQA commented Jun 20, 2018

Test build #92138 has finished for PR 21577 at commit 264c533.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

the code here lgtm, I was trying to make one more pass through all the scenarios but got stuck in meetings, will try to do it later tonight or tomorrow morning but we can always have another follow up if we find another case.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too

@tgravescs
Copy link
Contributor

+1

this is a bit of a side while looking through the scenarios I filed: https://issues.apache.org/jira/browse/SPARK-24622 . shouldn't be a problem here though with this fix.

@vanzin
Copy link
Contributor Author

vanzin commented Jun 21, 2018

So anyone wants to do the actual merging?

@tgravescs
Copy link
Contributor

I will

@cloud-fan
Copy link
Contributor

I pushed the change for that in: vanzin/spark@e6a862e

I like it, it's simpler to use task id to replace stage attempt id and task attempt id. For safety we should do it in master only after this PR is merged.

@asfgit asfgit closed this in c8e909c Jun 21, 2018
asfgit pushed a commit that referenced this pull request Jun 21, 2018
…ator.

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21577 from vanzin/SPARK-24552.

(cherry picked from commit c8e909c)
Signed-off-by: Thomas Graves <tgraves@apache.org>
asfgit pushed a commit that referenced this pull request Jun 21, 2018
…ator.

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21577 from vanzin/SPARK-24552.

(cherry picked from commit c8e909c)
Signed-off-by: Thomas Graves <tgraves@apache.org>
@vanzin vanzin deleted the SPARK-24552 branch June 21, 2018 20:13
@tgravescs
Copy link
Contributor

merged to master, 2.3, and 2.2

@zzcclp
Copy link
Contributor

zzcclp commented Jun 22, 2018

@vanzin @tgravescs , after merge this pr into branch-2.2, there is an error "stageAttemptNumber is not a member of org.apache.spark.TaskContext" in SparkHadoopMapRedUtil, I think it needs to merge PR-20082 first.

@tgravescs
Copy link
Contributor

Yeah sorry about that, my fault. I merged the fix - SPARK-22897

vanzin pushed a commit to vanzin/spark that referenced this pull request Jun 22, 2018
…ator [branch-2.1].

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#21577 from vanzin/SPARK-24552.

(cherry picked from commit c8e909c)
Signed-off-by: Thomas Graves <tgraves@apache.org>
(cherry picked from commit 751b008)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
@rdblue
Copy link
Contributor

rdblue commented Jun 22, 2018

Thanks for fixing this, @vanzin!

MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…ator.

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#21577 from vanzin/SPARK-24552.

(cherry picked from commit c8e909c)
Signed-off-by: Thomas Graves <tgraves@apache.org>
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
…ator.

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#21577 from vanzin/SPARK-24552.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
…ator.

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#21577 from vanzin/SPARK-24552.

(cherry picked from commit c8e909c)
Signed-off-by: Thomas Graves <tgraves@apache.org>
turboFei pushed a commit to turboFei/spark that referenced this pull request Sep 4, 2018
…ator [branch-2.1].

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#21577 from vanzin/SPARK-24552.

(cherry picked from commit c8e909c)
Signed-off-by: Thomas Graves <tgraves@apache.org>
(cherry picked from commit 751b008)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants