-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-24589][core] Correctly identify tasks in output commit coordinator. #21577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ator. When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task if allowed to commit the output in the above case. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it.
Test build #91949 has finished for PR 21577 at commit
|
Test build #91950 has finished for PR 21577 at commit
|
this was along the lines of what I was thinking as well. Will do a full review later. From the other PR:
Are there other docs that need to be updated for v2 datasource api? @rdblue @cloud-fan |
I think Ryan's change might still be good to introduce (i.e. a change that replaces the attempt id in that code with something a little more unique), regardless of any fix here. The unit tests I added artificially re-creates the calls that would lead to the situation, but I haven't tried to create a test case that would run things through the scheduler. |
@@ -81,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging { | |||
logInfo(message) | |||
// We need to abort the task so that the driver can reschedule new attempts, if necessary | |||
committer.abortTask(mrTaskContext) | |||
throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber) | |||
throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we also include stage attempt number in the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Was trying to minimize changes in the first version, for testing.
Yes we need, but this can be done in a different PR |
thanks! the fix LGTM |
I'll try to create a test to exercise this in a real job (aside from the exception changes @cloud-fan suggested), but wouldn't hold my breath. |
Rename the field to match what it actually is; except for the JSON-serialized version, which for backwards compatibility still uses "job" instead of "stage".
state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber) | ||
true | ||
} else { | ||
logDebug(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to include the stage attempt in the log messages as well.
I tried to create a test based on actually running a job, but I'd have to do a lot of hacking to control what the result stage does, and it was starting to feel not much better than the unit test I added here already, so I gave up. |
Test build #92039 has finished for PR 21577 at commit
|
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " + | ||
s"partition=$partition) failed; clearing lock") | ||
stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER | ||
stageState.authorizedCommitters(partition) = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: why not use Option[TaskIdentifier] and None here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Less memory usage, at least. Not sure what advantage using Option
would bring here.
s"attempt: $attemptNumber") | ||
case otherReason => | ||
case _: TaskCommitDenied => | ||
logInfo(s"Task was denied committing, stage: $stage / $stageAttempt, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Should this be s"$stage.$stageAttempt"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks better and saves some space, so will do.
@@ -399,7 +399,8 @@ private[spark] object JsonProtocol { | |||
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~ | |||
("Accumulator Updates" -> accumUpdates) | |||
case taskCommitDenied: TaskCommitDenied => | |||
("Job ID" -> taskCommitDenied.jobID) ~ | |||
("Job ID" -> taskCommitDenied.stageID) ~ | |||
("Job Attempt Number" -> taskCommitDenied.stageAttempt) ~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this use "Job" and not "Stage"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, will this affect the compatibility of the history server files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the new property, I'm just following what the old property says, even though it's wrong. I think having Job ID
and Stage Attempt Number
would just be even more confusing...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it shouldn't affect compatibility. Given the code, even an old history server would be able to read these new log files.
+1. This fixes the commit coordinator problem where two separate tasks can be authorized. That case could lead to duplicate data (if, for example, both tasks generated unique file names using a random UUID). However, this doesn't address the problem I hit in practice, where a file was created twice and deleted once because the same task attempt number was both allowed to commit by the coordinator and denied commit by the coordinator (after the stage had finished). We still need the solution proposed in #21558 for the v2 API. But that's more of a v2 API problem because that API makes the guarantee that implementations can rely on the attempt ID. |
@@ -109,20 +116,21 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |||
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. | |||
* the maximum possible value of `context.partitionId`). | |||
*/ | |||
private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized { | |||
private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { | |||
stageStates(stage) = new StageState(maxPartitionId + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My memory is a bit rusty here, but are we changing the semantics of which task can commit here ?
Couple of queries:
- Are we allowing task from a previous stage attempt to commit for current stage attempt ? (after previous stage attempt has failed/finished).
- Based on
TaskIdentifier
above, I this yes ?- If no, should we check and reject commit requests from tasks from 'older' stage when the current stage attempt is different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the semantics are changing. It's always been racy, in that either of the concurrent tasks from different stage attempts may succeed first. And I'm almost sure the assumption is that both task attempts are equivalent (i.e. the output is deterministic or at least should be), so it should be fine for either to be committed.
The problem is that without this change the coordinator would allow both attempts to commit, and that is kinda bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two cases here (both not handled in existing/earlier code).
Handled in PR:
- Stage S1 attempt A1 launched.
- Tasks T1_1 launched for partition P1
- A1 fails
- Stage S1 attempt A2 launched.
- Tasks T1_2 for partition P1 launched.
- T1_1 finishes, and is allowed to commit.
IMO not handled in PR:
- Stage S1 attempt A1 launched.
- Tasks T1_1.1 launched for partition P1
- Tasks T1_1.2 launched for partition P1 (speculative)
- Task T1_1.1 committed.
- A1 fails
- Stage S1 attempt A2 launched for some other pending partitions.
- Tasks T1_1.2 wants to commit.
T1_1.2 will be allowed to commit.
Now we have two tasks for same partition successfully committing.
Did I miss something here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
T1_1.2
will not be allowed, it has a different task attempt number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I read @vanzin's PR right, T1_1.2 will be allowed to commit - since there is a stageEnd + stageStart in between (which clear the existing stage state).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think this can happen. The problem is that with the current way it's used, the output committer forgets the commit status between stage retries. I think the right thing would be for the committer to keep the stage-related state until the scheduler is done with all its attempts.
whether it will generate corrupted data when the commit process of T1_1.1 didn't finish
I don't think that's the problem. The problem is that if both the initial task and the speculative task finish successfully, but across stage attempt barriers (so the output committer is "reset" in between), both will be allowed to commit, so you get duplicate data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in scenario 2, once the first task finishes and is committed, the taskset manager will kill the speculative task T1_1.2. But since it sends an async message to kill the task, the task could actually try to commit after another task fails and causes the stage to remove itself from the output commit coordinator and after it starts another stage attempt. So it could actually end up committed the task output for T1_1.2. I'm not sure this case by itself is a problem though since if it actually committed T1_1.1 and T1_1.2 is allowed to commit, they should have the same output and commitJob would handle in at least most cases. The caveat there though would be if since T1_1.1 was committed, the second stage attempt could finish and call commitJob while T1_1.2 is committing since spark thinks it doesn't need to wait for T1_1.2. Anyway this seems very unlikely but we should protect against it.
There is another case though here where T1_1.1 could have just asked to be committed, but not yet committed, then if it gets delayed committing, the new stage attempt starts and T1_1.2 asks if it could commit and is granted, so then both try to commit at the same time causing corruption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the right thing would be for the committer to keep the stage-related state until the scheduler is done with all its attempts.
We should change the DAGScheduler
a little bit that, if a stage is killed and going to be re-tried, do not clear the stage states in output coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I'm going down that path. I just want to add a proper test to make sure the behavior is correct and that's a little bit tricky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree @vanzin, @cloud-fan. We should remove the stage info only after the stage is done.
Should we have a separate bug for these then? I just piggybacked on the bug you filed, but if they're separate issues, even if complementary, might be better to separate them. |
FYI I plan to fix the mima issue later today (still fighting with 2.1 builds). Haven't decided whether to revert the change or just add excludes... probably the latter since it's a developer api. |
So I think the commit/delete thing is also an issue for existing v1 and hadoop committers as well. So this doesn't fully solve the problem. spark uses a file format like (HadoopMapReduceWriteConfigUtil/HadoopMapRedWriteConfigUtil):
I believe the same fix as the v2 would work using the taskAttemptId instead of the attemptNumber. In the case we have the stage failure and a second stage attempt the task attempt number could be the same and thus both tasks write to the same place. If one of them fails or is told not to commit it could delete the output which is being used by both. Need to think through all the scenarios to make sure its covered. |
This in general looks good, IMO we shall focus on fixing the output commit coordinator issue in this PR, and discuss the data source issue in a separated thread. |
I'm fine with separating them but we need a jira or need to update the v2 jira to handle all cases |
This reverts commit 5437d4a.
Yeah I noticed that to but I think we should perhaps file separate jira and only do that in 2.4 and maybe 2.3.2 just to limit changes for now |
Sounds good to me (although I'm trying the change locally and unit tests are so far happy). |
I filed SPARK-24611 to track some enhancements to this part of the code that have been discussed here. Of those, I'd consider the "use task IDs instead of TaskIdentifier" as something we could potentially do here, but at the same time I don't really want to delay this patch too much. |
I pushed the change for that in: vanzin@e6a862e In case anyone wants to take a look. |
Test build #92138 has finished for PR 21577 at commit
|
the code here lgtm, I was trying to make one more pass through all the scenarios but got stuck in meetings, will try to do it later tonight or tomorrow morning but we can always have another follow up if we find another case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too
+1 this is a bit of a side while looking through the scenarios I filed: https://issues.apache.org/jira/browse/SPARK-24622 . shouldn't be a problem here though with this fix. |
So anyone wants to do the actual merging? |
I will |
I like it, it's simpler to use task id to replace stage attempt id and task attempt id. For safety we should do it in master only after this PR is merged. |
…ator. When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task is allowed to commit the output in the above case. The stage state in the coordinator is also maintained across stage retries, so that a stray speculative task from a previous stage attempt is not allowed to commit. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #21577 from vanzin/SPARK-24552. (cherry picked from commit c8e909c) Signed-off-by: Thomas Graves <tgraves@apache.org>
…ator. When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task is allowed to commit the output in the above case. The stage state in the coordinator is also maintained across stage retries, so that a stray speculative task from a previous stage attempt is not allowed to commit. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #21577 from vanzin/SPARK-24552. (cherry picked from commit c8e909c) Signed-off-by: Thomas Graves <tgraves@apache.org>
merged to master, 2.3, and 2.2 |
@vanzin @tgravescs , after merge this pr into branch-2.2, there is an error "stageAttemptNumber is not a member of org.apache.spark.TaskContext" in SparkHadoopMapRedUtil, I think it needs to merge PR-20082 first. |
Yeah sorry about that, my fault. I merged the fix - SPARK-22897 |
…ator [branch-2.1]. When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task is allowed to commit the output in the above case. The stage state in the coordinator is also maintained across stage retries, so that a stray speculative task from a previous stage attempt is not allowed to commit. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#21577 from vanzin/SPARK-24552. (cherry picked from commit c8e909c) Signed-off-by: Thomas Graves <tgraves@apache.org> (cherry picked from commit 751b008) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Thanks for fixing this, @vanzin! |
…ator. When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task is allowed to commit the output in the above case. The stage state in the coordinator is also maintained across stage retries, so that a stray speculative task from a previous stage attempt is not allowed to commit. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#21577 from vanzin/SPARK-24552. (cherry picked from commit c8e909c) Signed-off-by: Thomas Graves <tgraves@apache.org>
…ator. When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task is allowed to commit the output in the above case. The stage state in the coordinator is also maintained across stage retries, so that a stray speculative task from a previous stage attempt is not allowed to commit. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#21577 from vanzin/SPARK-24552.
…ator. When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task is allowed to commit the output in the above case. The stage state in the coordinator is also maintained across stage retries, so that a stray speculative task from a previous stage attempt is not allowed to commit. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#21577 from vanzin/SPARK-24552. (cherry picked from commit c8e909c) Signed-off-by: Thomas Graves <tgraves@apache.org>
…ator [branch-2.1]. When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task is allowed to commit the output in the above case. The stage state in the coordinator is also maintained across stage retries, so that a stray speculative task from a previous stage attempt is not allowed to commit. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#21577 from vanzin/SPARK-24552. (cherry picked from commit c8e909c) Signed-off-by: Thomas Graves <tgraves@apache.org> (cherry picked from commit 751b008) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.
The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.
This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.