-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-24552][SQL] Use task ID instead of attempt number for v2 writes. #21558
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@cloud-fan, this is a work-around for SPARK-24552. I'm not sure the right way to fix this besides fixing the scheduler so that it doesn't use task attempt numbers twice, but I think this works. |
It's a little weird to use the attempt id here anyway; the stage ID can change for a different execution of the same stage, IIRC, and that would reset the attempt id. @squito probably remembers a lot more about this off the top of his head. |
IMO your change is the right fix, not just a workaround. I don't think its a scheduler bug (though its definitely unclear). I'll move that discussion to the jira. An alternative would be using
hmm, the only place I could imagine that happening is with a shared shuffle dependency between jobs, which gets renumbered and then skipped, but then perhaps re-executed on a fetch-failure. That isn't relevant here, though, since it would only be shuffle map stages, not result stages. |
for a write job, the writing only happens at the last stage, so stage id doesn't matter, and data source v2 API assumes So the problem here is, when we retry a stage, Spark doesn't kill the tasks of the old stage and just launch tasks for the new stage. We may have running write tasks that have the same One solution is, we can just use Or we can use |
As @squito suggested, we can either use taskAttemptId or combine stageAttemptId and taskAttemptNumber together, both shall be able to represent a unique task attempt. |
I think that's something that should be fixed, but it wouldn't entirely fix the problem unless we were very careful about ordering in the driver: the stage would have to fail, then stop allowing commits, then wait for all of the tasks that were allowed to commit to finish, and account for the coordination messages being in flight. Not an easy problem. I'd like to see a fix that makes the attempt number unique within a job and partition, i.e., no two tasks should have the same (job id, partition id, attempt number) triple as Wenchen said. |
@squito, part of the problem is that the output commit coordinator -- that ensures only one attempt of a task commits -- relies on commit number to allow or deny commits. If this is the correct fix, should we also pass the TID as the attempt id to the commit coordinator? |
Test build #91794 has finished for PR 21558 at commit
|
sorry trying to catch up on this thread. Are we saying this is a bug in the existing output committer (not only the new v2 datasource) as well when we have a fetch failure? |
so I looked through the code and it certain appears to be a bug in the existing code (not just v2 datasource api). If you have one stage running that gets a fetch failure, if it leaves any tasks running with attempt 0, it could conflict with the restarted stage since those tasks would all start with attempt 0 as well. When I say it could it means it would be a race if they go to commit at about the same time. Its probably more of an issue if one commits, then starts the job commit and the other task starts to then commit its, you could end up with incomplete/corrupt file. We should see the warning "Authorizing duplicate request to commit" in the logs though if this occurs. @rdblue does this match what you are seeing? |
I took a look at the output coordinator code and, depending on how the scheduler behaves, it might be ok. The coordinator will deny commits for finished stages; so it depends on the order of things. If the failed attempt is marked as "failed" before the next attempt starts, then it's ok, even if tasks for the failed attempt are still running. Looking at the code handling |
Actually, scratch that, there is really a problem. The output committer only checks the stage ID, not the stage attempt ID, so it will still allow tasks from the failed attempt to commit... |
yeah that is the code that I was still looking at to verify if it can actually happen. on a fetch failure the dag scheduler removes the stage from the ouput committer, but when the new stage attempt starts it just puts the stage back with the stage id (not attempt id). |
I guess https://issues.apache.org/jira/browse/SPARK-24492 is potentially cause by the output committer issue ? |
I'm not sure, but I'm starting to think the part of the fix for SPARK-18113 that added the "Authorizing duplicate request..." stuff should be removed. The rest of that change replaces |
(Or, alternatively, the output committer could track task IDs instead of attempt numbers. That should result in the same behavior but haven't really looked at that.) |
I started with some ideas in #21577 ; I haven't tested that PR aside from the modified unit test, but I think it's in the right direction. I plan to work more on it Monday, but leaving it there in case people are around over the weekend and want to comment. |
@tgravescs, that's exactly what we're seeing. I think it might just be misleading to have a stage-local attempt ID although it is more friendly for users and matches what MR does. @jiangxb1987, we see SPARK-24492 occasionally (it has gotten better with later fixes to the coordinator) and haven't tracked down the cause yet. If this were the underlying cause that would be great, but I'm not sure how it could be the cause. If the same attempt number is reused, then two tasks in different stage attempts may both be authorized to commit. That wouldn't cause the retries because it wouldn't cause extra commit denials. |
e9e776a
to
6c60d14
Compare
I think the right thing to do for this commit is to use the task ID instead of the stage-local attempt number. I've updated the PR with the change so I think this is ready to commit. @vanzin, are you okay committing this? cc @cloud-fan |
I'm fine with this; my PR should fix the underlying issue but this still seems like a good idea to me. |
But note that the task ID is also not necessarily unique (since you can have multiple attempts of the same task). So perhaps you should consider Imran's suggestion or mixing stage and task attempt numbers. |
@vanzin, the ID that this uses is the TID, which I thought was always unique. It appears to be a one-up counter. Also, I noted on your PR that both are needed because even if we only commit one of the attempts, the writers may use this ID to track and clean up data. |
Test build #92043 has finished for PR 21558 at commit
|
Retest this please. |
If that's the case, and a different attempt for the same partition will get a different task ID, that's fine. I was under the impression it would have the same task ID. |
Yes, I just checked and speculative attempts do get a different TID. Just turn on speculation, run a large stage, and sort tasks in a stage by TID. There aren't duplicates. |
Actually I didn't mean speculation but something like this:
Anyway I ran that and the behavior is the same (different attempt = different task ID) so it's all good. |
Test build #92050 has finished for PR 21558 at commit
|
@@ -110,7 +108,7 @@ object DataWritingSparkTask extends Logging { | |||
useCommitCoordinator: Boolean): WriterCommitMessage = { | |||
val stageId = context.stageId() | |||
val partId = context.partitionId() | |||
val attemptId = context.attemptNumber() | |||
val attemptId = context.taskAttemptId().toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you rename this variable to tid
? these names are pretty confusing, but I think that at least "tid" is used consistently and exclusively for this, while "attempt" means a lot of different things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to suggest removing the cast to int, but well, that's in DataWriterFactory
and would be an API breakage... hopefully won't cause issues aside from weird output names when the value overflows the int.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HadoopWriteConfigUtil has the same issue, its a public interface and uses in for attempt number.
it seems somewhat unlikely but more likely to be able to go over an int for task ids in spark then in say MapReduce. we do have partitionId as an Int so if partitions go to Int and you have task failures then taskids could go over Int. Looking at our options
@rdblue would you want to update for v1 and hadoop committers? it should be very simialr to this change but in: createTaskAttemptContext should take the actual task attempt id (type Long) instead of the attempt number as an argument. I would need to look at the v1 writers to make sure there is nothing else, but perhaps you are more familiar with them. If not we can split that into a separate jira |
ping @rdblue ^ . If I don't hear tomorrow, will file separate jira. |
FYI, I'm preparing my own version of this PR with the remaining feedback addressed. Ryan was on paternity leave and I don't know whether he's done yet, so he may not be that responsive. This will conflict with the output commit coordinator change in any case, so one of them needs to wait (and that one is further along). |
Ah ok, was looking at my own version as well. There are other things we should update for v2 as well, other functions with the variable names, description in DataWriterFactory.java, etc. |
If you are working on this, I'll merge the other one and wait for you and continue to investigate in parallel |
See link above for the updated PR. |
@vanzin, thanks for working on this. I was out most of this week at a conference and I'm still on just half time, which is why I was delayed. Sorry to leave you all waiting. I'll make comments on your PR. |
…ites. This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted. For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem. Closes apache#21558 Author: Marcelo Vanzin <vanzin@cloudera.com> Author: Ryan Blue <blue@apache.org> Closes apache#21606 from vanzin/SPARK-24552.2. Ref: LIHADOOP-48531
What changes were proposed in this pull request?
This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted.
How was this patch tested?
Existing tests.