-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-51272][CORE]. Fix for the race condition in Scheduler causing failure in retrying all partitions in case of indeterministic shuffle keys #50033
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…in retrying all partitions in case of indeterministic shuffle keys
I will try to write a unit test which demonstrates race condition, which can be part of the checkin.. |
…on which will fail if all partitions are not retried for indeterministic shuffle stage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a very rudimentary pass, I will go through it more thoroughly. Please remove the TODOs you added to the PR.
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The provided unittest is invalid as it uses multiple threads to process events meanwhile the production code has a single dedicated thread so a race condition is not possible.
If you still think it is a race condition and you can reproduce the problem with a standalone test I suggest to add log lines two those places where you think the two threads are competing and use the "%t" formatter in log4j2 to include the thread names in the log. In this case please attach the reproduction code without any production code change (only the new logging lines should be added but it is fine if the reproduction should be retried 1000 times as race conditions are flaky in nature but I prefer the original production code) and attach the section of the logs where you think the race occurs.
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
The end to end bug reproduction without product code change is not feasible for me ( atleast at this point), in a single VM unit test. The race condition is possible ( and happens) because in the DagScheduler ::handleTaskCompletion method, there is asynchronicity introduced due to following snippet of code
|
Let me see if I can modify the test and expose the race, without using multiple threads at top level.. |
Here what we have is producer-consumer pattern. The
DAGSchedulerEventProcessLoop extends the EventLoop where the onReceive() method is called from a single thread:
spark/core/src/main/scala/org/apache/spark/util/EventLoop.scala Lines 42 to 66 in 1da65be
The locking implementation itself also contains some bugs but first let's focus on understanding the problem at hand. |
… relying on concurrent processing of DagEvent, instead relying on concurrent addition of event to the event loop queue
@attilapiros : Reworked the test to reproduce the race condition by relying on the behaviour of concurrency in the addition of the DagEvent in the Event Loop's queue.
The test reliably fails in current master. Also curious to know the issues you found with Locks usage. |
@attilapiros @squito , |
… always be in a single thread of EventLoop and that when ResubmitFailedStages event is submitted, though it is done by MessageScheduler ( a separate threadpool, but it is still doing post to the event loop's queue instead of direct onRecieve invocation, the read/write locks in stage are not needed
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
…on of beforeEach and afterEach in test
@ahshahid as the solution became very different please update its description too in the PR description. |
@attilapiros .Sincere thanks for the detailed review. |
@attilapiros : I am also not sure when you said : There is another issue created for this, please check out https://issues.apache.org/jira/browse/SPARK-25342." Are you refering to case 1 or case 2? I have explained why in case of Case1, the spurious result will not get committed. so does this mean it answers the above statement of yours? |
I focused on your "SPARK-51272: retry all the partitions of result stage, if the first result task has failed and failing ShuffleMap stage is inDeterminate" test where the current solution is not enough as we need a proper revert. What I am looking for is a simpler solution could you please look into this: This is PR against my repo and it has two failed tests but I am interested about your early opinion. |
Those two failed tests in my PR were coming the |
Do you think those assertions which you reverted , are right?
…On Mon, Apr 7, 2025, 6:18 PM Attila Zsolt Piros ***@***.***> wrote:
Those two failed tests in my PR were coming the DAGSchedulerSuite changes
so I reverted those.
—
Reply to this email directly, view it on GitHub
<#50033 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2ENMY43UGW4XTMGTP32YMPVXAVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBUHE4TAOBXGY>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
[image: attilapiros]*attilapiros* left a comment (apache/spark#50033)
<#50033 (comment)>
Those two failed tests in my PR were coming the DAGSchedulerSuite changes
so I reverted those.
—
Reply to this email directly, view it on GitHub
<#50033 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2ENMY43UGW4XTMGTP32YMPVXAVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBUHE4TAOBXGY>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Yes, they are correct. That stage has two tasks one is running on
The fetch failure was from the host called
This caused a executor lost on
So this removes the output which was made on
|
What about the retry of partitions?. Because the failing stage is
indeterminate, all the partitions should be retired.. isn't it?
…On Mon, Apr 7, 2025, 8:11 PM Attila Zsolt Piros ***@***.***> wrote:
Yes, they are correct.
That stage has two tasks one is running on hostC and the other is on hostD
:
https://github.com/apache/spark/blob/00a4aadb8cfce30f2234453c64b9ca46c60fa07f/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala#L3160
The fetch failure was from the host called hostC:
https://github.com/apache/spark/blob/00a4aadb8cfce30f2234453c64b9ca46c60fa07f/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala#L3166
This caused a executor lost on hostC:
25/04/07 19:48:08.769 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Executor lost: hostC-exec (epoch 4)
So this removes the output which was made on hostC. This is how we get
the assert right but latter when the ResubmitFailedStages is handled the
execution goes to submitMissingTasks() where all the output is removed:
https://github.com/apache/spark/blob/2b3fb526c8bd8b486f280756d5282cc84f7473d7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1555
—
Reply to this email directly, view it on GitHub
<#50033 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2D43ASFL2HXHDCFIMD2YM46BAVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGEYTGMBYGE>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
[image: attilapiros]*attilapiros* left a comment (apache/spark#50033)
<#50033 (comment)>
Yes, they are correct.
That stage has two tasks one is running on hostC and the other is on hostD
:
https://github.com/apache/spark/blob/00a4aadb8cfce30f2234453c64b9ca46c60fa07f/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala#L3160
The fetch failure was from the host called hostC:
https://github.com/apache/spark/blob/00a4aadb8cfce30f2234453c64b9ca46c60fa07f/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala#L3166
This caused a executor lost on hostC:
25/04/07 19:48:08.769 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Executor lost: hostC-exec (epoch 4)
So this removes the output which was made on hostC. This is how we get
the assert right but latter when the ResubmitFailedStages is handled the
execution goes to submitMissingTasks() where all the output is removed:
https://github.com/apache/spark/blob/2b3fb526c8bd8b486f280756d5282cc84f7473d7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1555
—
Reply to this email directly, view it on GitHub
<#50033 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2D43ASFL2HXHDCFIMD2YM46BAVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGEYTGMBYGE>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
...but latter when the ResubmitFailedStages is handled the execution goes to submitMissingTasks() where all the output is removed:
|
But that is true only for shuffle stage, not with result stage.
In case of result stage, output removal will not work.. in this pr , that
is handled
…On Mon, Apr 7, 2025, 8:43 PM Attila Zsolt Piros ***@***.***> wrote:
What about the retry of partitions?. Because the failing stage is
indeterminate, all the partitions should be retired.. isn't it?
...but latter when the ResubmitFailedStages is handled the execution goes
to submitMissingTasks() where all the output is removed:
https://github.com/apache/spark/blob/2b3fb526c8bd8b486f280756d5282cc84f7473d7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1555
—
Reply to this email directly, view it on GitHub
<#50033 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2A7AVEA6D4GZRYVBIT2YNAVJAVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGE2DINJZGY>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
[image: attilapiros]*attilapiros* left a comment (apache/spark#50033)
<#50033 (comment)>
What about the retry of partitions?. Because the failing stage is
indeterminate, all the partitions should be retired.. isn't it?
...but latter when the ResubmitFailedStages is handled the execution goes
to submitMissingTasks() where all the output is removed:
https://github.com/apache/spark/blob/2b3fb526c8bd8b486f280756d5282cc84f7473d7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1555
—
Reply to this email directly, view it on GitHub
<#50033 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2A7AVEA6D4GZRYVBIT2YNAVJAVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGE2DINJZGY>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Please checkout https://github.com/attilapiros/spark/pull/8/files?diff=unified&w=1 Ignoring the whitespaces the production code is just 13 lines added to |
Will check it tomorrow... BTW if you take your change and the other PR
51016.. and enable the ha test added in this pr of mine, Pls check if it
passes
…On Mon, Apr 7, 2025, 9:02 PM Attila Zsolt Piros ***@***.***> wrote:
Please checkout
https://github.com/attilapiros/spark/pull/8/files?diff=unified&w=1
Ignoring the whitespaces the production code is just 13 lines added to
DAGScheduler and there is no new state introduced.
So the change is much simpler and easier to reason about.
—
Reply to this email directly, view it on GitHub
<#50033 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2DYORH7J3Q4W2FOH5L2YNC6PAVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGE3DGNJUGA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
[image: attilapiros]*attilapiros* left a comment (apache/spark#50033)
<#50033 (comment)>
Please checkout
https://github.com/attilapiros/spark/pull/8/files?diff=unified&w=1
Ignoring the whitespaces the production code is just 13 lines added to
DAGScheduler and there is no new state introduced.
So the change is much simpler and easier to reason about.
—
Reply to this email directly, view it on GitHub
<#50033 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2DYORH7J3Q4W2FOH5L2YNC6PAVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGE3DGNJUGA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Also I am confused.. you said that in the simplified PR of yours, the
problem of reversion of result tasks existed, implying that results could
be wrong... Is that problem , solved in the simplified PR?
…On Mon, Apr 7, 2025, 9:08 PM Asif Shahid ***@***.***> wrote:
Will check it tomorrow... BTW if you take your change and the other PR
51016.. and enable the ha test added in this pr of mine, Pls check if it
passes
On Mon, Apr 7, 2025, 9:02 PM Attila Zsolt Piros ***@***.***>
wrote:
> Please checkout
> https://github.com/attilapiros/spark/pull/8/files?diff=unified&w=1
>
> Ignoring the whitespaces the production code is just 13 lines added to
> DAGScheduler and there is no new state introduced.
> So the change is much simpler and easier to reason about.
>
> —
> Reply to this email directly, view it on GitHub
> <#50033 (comment)>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AC6XG2DYORH7J3Q4W2FOH5L2YNC6PAVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGE3DGNJUGA>
> .
> You are receiving this because you were mentioned.Message ID:
> ***@***.***>
> [image: attilapiros]*attilapiros* left a comment (apache/spark#50033)
> <#50033 (comment)>
>
> Please checkout
> https://github.com/attilapiros/spark/pull/8/files?diff=unified&w=1
>
> Ignoring the whitespaces the production code is just 13 lines added to
> DAGScheduler and there is no new state introduced.
> So the change is much simpler and easier to reason about.
>
> —
> Reply to this email directly, view it on GitHub
> <#50033 (comment)>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AC6XG2DYORH7J3Q4W2FOH5L2YNC6PAVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGE3DGNJUGA>
> .
> You are receiving this because you were mentioned.Message ID:
> ***@***.***>
>
|
No even in the simplified I just abort the stage when a revert would be needed. |
I see. So let me ask one final question:
If say it's the first result task which has failed and is being processed
by event process thread, in this simplifed PR, will there be a window /
situation , where the query will be aborted?
I guess, yes?
in the PR which I have opened, that window is not there.
Now it's based on individual 's preference.
I would gravitate towards my PR , if it minimizes the abort.
However if the simplified PR of yours also, ensure that window of abort is
completely absent, then the simplified PR of yours is preferable.
…On Mon, Apr 7, 2025, 10:25 PM Attila Zsolt Piros ***@***.***> wrote:
No even in the simplified I just abort the stage when a revert would be
needed.
—
Reply to this email directly, view it on GitHub
<#50033 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2AFJT2CQYX4V464PHT2YNMT7AVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGI2TONRWHE>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
[image: attilapiros]*attilapiros* left a comment (apache/spark#50033)
<#50033 (comment)>
No even in the simplified I just abort the stage when a revert would be
needed.
—
Reply to this email directly, view it on GitHub
<#50033 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2AFJT2CQYX4V464PHT2YNMT7AVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGI2TONRWHE>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Of course that is my personal view...and non binding as a non committer
Regards
Asif
…On Mon, Apr 7, 2025, 10:33 PM Asif Shahid ***@***.***> wrote:
I see. So let me ask one final question:
If say it's the first result task which has failed and is being processed
by event process thread, in this simplifed PR, will there be a window /
situation , where the query will be aborted?
I guess, yes?
in the PR which I have opened, that window is not there.
Now it's based on individual 's preference.
I would gravitate towards my PR , if it minimizes the abort.
However if the simplified PR of yours also, ensure that window of abort is
completely absent, then the simplified PR of yours is preferable.
On Mon, Apr 7, 2025, 10:25 PM Attila Zsolt Piros ***@***.***>
wrote:
> No even in the simplified I just abort the stage when a revert would be
> needed.
>
> —
> Reply to this email directly, view it on GitHub
> <#50033 (comment)>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AC6XG2AFJT2CQYX4V464PHT2YNMT7AVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGI2TONRWHE>
> .
> You are receiving this because you were mentioned.Message ID:
> ***@***.***>
> [image: attilapiros]*attilapiros* left a comment (apache/spark#50033)
> <#50033 (comment)>
>
> No even in the simplified I just abort the stage when a revert would be
> needed.
>
> —
> Reply to this email directly, view it on GitHub
> <#50033 (comment)>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AC6XG2AFJT2CQYX4V464PHT2YNMT7AVCNFSM6AAAAABXSCDWSKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDOOBVGI2TONRWHE>
> .
> You are receiving this because you were mentioned.Message ID:
> ***@***.***>
>
|
I do not see any window there but you can add a test if you suspect any. |
@attilapiros .. added a comment on your private PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick look - will do a more indepth analysis later.
I want to make sure I am not missing anything here.
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
} | ||
} else { | ||
mapOutputTracker.unregisterAllMapAndMergeOutput( | ||
mapStage.shuffleDep.shuffleId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add if/else for SHUFFLE_USE_OLD_FETCH_PROTOCOL
.
Orthogonally, given we are in 4.0, perhaps it is time to drop SHUFFLE_USE_OLD_FETCH_PROTOCOL
. Thoughts @attilapiros ?
} | ||
|
||
case resultStage: ResultStage if resultStage.activeJob.isDefined => | ||
val numMissingPartitions = resultStage.findMissingPartitions().length | ||
if (numMissingPartitions < resultStage.numTasks) { | ||
// TODO: support to rollback result tasks. | ||
abortStage(resultStage, generateErrorMessage(resultStage), None) | ||
} else { | ||
resultStage.markAllPartitionsMissing() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Here and in other places) If ResultStage does not have any missing tasks - why are we failing it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are at this line, as per my understanding, because the current stage ( result stage) , its first task has failed. And the ResultStage is directly or indirectly dependent on an inDeterminate stage ( that is ResultStage is an inDeterminate type of stage). The function call resultStage.markAllPartitionsMissing(), sets the flag for this stage , such that while this stage is being resubmitted for re-execution ( a separate thread adds it back to the event queue), in that window , if any successful task comes for some other partition , it should be rejected. If it gets added , then that is the race. As it would other wise have resulted in refetch of some partitions ( & not all).
This flag is checked at
line : 1881.
&& stage.isIndeterminate) || stage.shouldDiscardResult(task.stageAttemptId)
resultStage.markAllPartitionsMissing() | ||
} | ||
|
||
case _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failedStage
has already been marked as failed.
Given mapStage is not indeterminate, but only failedStage
(the 'reducer' stage which is a ResultStage
) is - we only need to recompute the failed partitions for it, not all partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the case, where ResultStage is failing due to the first task. The ShuffleID which caused the result stage to fail is Determinate. But the Result Stage ( in case of Join) is dependent on two shuffle stages, where the other shuffle stage is inDeterminate. ( which makes the ResultStage also inDeterminate).
Now at this point its not known whether the other inDeterminate shuffle stage has also failed or not.
Assuming that if it has also failed, and one of the result task is marked successful or if not all partitions are retried, then we will get wrong results ( the commented HA test can fail sporadically due to this problem).
So in this case also, the behaviour needs to be the same as the previous one ( where the failing Shuffle Stage is inDeterminate).
} | ||
|
||
override def shouldDiscardResult(attemptId: Int): Boolean = | ||
this.discardResultsForAttemptId >= attemptId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming both usecases for markAllPartitionsMissing
are not required (based on discussion above), we can remove these changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless we decide to aggressively abort the query, even if its the first task which is failing ( & resultStage is inDeterminate), this piece of code and the two above, are needed , IMO to avoid the race condition.
@mridulm IMHO regarding an indeterministic result stage we should abort the stage more aggressively as we cannot re-execute any of its tasks twice as on the executor side repeating the operation with different data can lead to corrupted results. One example is the using FileOutputCommitter. Here I cannot see there are any guarantees of re-execution a Hadoop Task commit with different data. The other good/better example is writing to an external DB via JDBC: Here you can see it iterates over on the partitions and calls INSERT INTOs: spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala Lines 990 to 992 in 1fa05b8
where one INSERT INTO is SQL dialect specific: spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala Line 983 in 1fa05b8
So any re-execute will duplicate the data. I think this is why https://issues.apache.org/jira/browse/SPARK-25342 is opened. @mridulm WDYT? |
@attilapiros I am not entirely sure about specific context of your comment :-) If you mean this or this, then that is no different from how speculative tasks for indeterminate stage(s) behave : if all partitions had completed and committed, they dont need to be recomputed. For the first, map stage is determinate - so reexecution will not change input data for 'reducer' (though can change order) - same as spec execution for a partition of this result stage. SPARK-25342 handles the case where a stage needs to be reexecuted, which is indeterminate - and so impacts the dag of all child stages. Do let me know if I am missing something ! (Note: Edited for clarity) |
Even if the map stage determinate a fetch failure will lead to executor lost which can remove map output of the indeterminate stage and when the result stage is resubmitted the indeterminate parent will be detected as missing and will be resubmitted. So when we have 3 stages:
A I believe this is tested as spark/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala Lines 3222 to 3271 in 2d3cb78
Despite the test name says " failing ShuffleMap stage is inDeterminate" the shuffleId1 used at the fetch failure belongs to the determinate stage.
Can it be in this case even speculative tasks are leading to errors for the writing to JDBC? As repeated |
The indeterminate stage here is a For the case of shuffle map stage, With that out of the way, let us look at the scenario described:
There are bunch of cases here, and we will need to analyze their impact. I am focussing on two main cases: For the common case, when Having said that: I will need to recheck if That is, if
Does this align with your observations/analysis @attilapiros ? |
Specifically about JDBC - assuming it is not due to the case we discussed above - I am not entirely sure :-) |
We are getting closer.
There is transaction management for writing the rows of a partition (so for a task): spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala Lines 804 to 807 in 1fa05b8
But the re-execution of a task will do the duplication as I see. |
Just checked the JDBC integration you referenced @attilapiros, the issue is we are always committing when all rows are done processing. Instead, we should commit the txn only as part of the commit protocol - when driver lets the task know it can commit. This will be an issue even without any determinism issue, for example when there is speculative execution - as a race condition. |
But do we want one more situation where it breaks? And my other concern is the Hadoop's file output committer I cannot see any guarantees what will happen if a Hadoop task is re-commited with different data (because of indeterminism we might have different data). |
… for each of shuffle stages so that stages are not unnecessarily retried because of shared host names
@attilapiros the way to handle this in spark is to leverage output committer/commit protocol, based on what is described in your observations and analysis.
You cannot 're-commit' with different data : this is why result stage is failed in case of indeterminism |
What changes were proposed in this pull request?
In the DagScheduler code , a race condition exists, due to concurrency in addition of DagScheduler events in the EventLoop Queue.
Changeset 1
In case of a FetchFailure for an indeterministic task, there is a window where before ResubmitFailedStages is added to the event queue for processing, a successful partition task completion event can get added to the EventQueue , before ResubmitFailedStages event.
The code change is to mark the stage such that it treats all partitions as missing, just before submitting the ResubmitFailedStages event to the queue.
So while creating a fresh retry attempt for stage, the missing partitions will be assumed to be all the partitions, even if a partition task completes successfully in that window between submission of the ResubmitFailedStages and retry of stage.
Once the retry attempt number increases, then the mark on stage can be removed.
With the above changes, it turns out that two existing tests have wrong assertions , IMO, as all partitions are not subjected to be retried.
Changeset 2
In case a ResultStage 's first partition task fails with a FetchFailure and if the ResultStage is dependent on 2 ShuffleStages, such that ShuffleStage1 is Determinate , while ShuffleStage2 is InDeterminate. If the FetchFailure corresponds to ShuffleStage 1( the Determinate ) , even then the ResultStage should retry all the partitions and do not accept any incoming successful partition for the failure attempt. It is because though the failing ShuffleStage is determinate, there is no guarantee at this point, that the ShuffleStage2 which is inDeterminate has also failed or not. If that has failed and a successful task corresponding to a partition using the failed attempt stage comes in, it is going to create data corruption as , inDeterminate stage will get rexecuted.
Why are the changes needed?
There is a race condition, where a successful task completion concurrent with a task failure , for an inDeterminate stage, results in a situation , where instead of re-executing all partitions, only some are retried. This results in data loss.
The race condition identified is as follows:
a) A successful result stage task, is yet to mark in the boolean array tracking partitions success/failure as true/false.
b) A concurrent failed result task, belonging to an InDeterminate stage, idenitfies all the stages which needs/ can be rolled back. For Result Stage, it looks into the array of successful partitions. As none is marked as true, the ResultStage and dependent stages are delegated to thread pool for retry.
c) Between the time of collecting stages to rollback and re-try of stages, the successful task marks boolean as true.
d) The Retry of Stage, as a result, misses the partition marked as successful, for retry.
The above race condition also exists for the case where the ResultStage is InDeterminate, but the shuffle stage failing is Determinate and ResultStage depends on both the Shuffle Stages.
An existing test (core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala) , has incorrect assertions regarding the number of partitions being retried , for an inDeterminate stage.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit test reproducing the race condition.
Have a functional test which exposes following:
Data Loss due to buggy Stage.isInDeterminate SPARK-51016
Race condition causing Data Loss , even if above bug's PR is taken.
Have added a functional HA Test which is disabled as of now, and it will pass in all iterations only IFF this PR and PR corresponding to issue https://issues.apache.org/jira/browse/SPARK-51016
,ie #50029 is also taken.
I have created a test PR which combines the changes in this PR and PR for SPARK-51016 , and has the SparkHASuite's test enabled, for reference.
spark-51016 -51272-combined-with-HA-test-enabled
The functional issue with race condition can also be reproduced using a single VM junit test but that requires source code modification. For that ,
Attaching two files for reproducing the functional bug , showing the race condition causing data corruption.
I am attaching 2 files for bug test
bugrepro.patch
BugTest.txt
This is needed to coax the single VM test to reproduce the issue. It has lots of interception and tweaks to ensure that system is able to hit the data loss situation.
( like each partition writes only a shuffle file containing keys evaluating to same hashCode and deleting the shuffle file at right time etc)
The BugTest itself.
a) If the bugrepro.patch is applied to current master and the BugTest run, it will fail immediately with assertion failure where instead of 12 rows, 6 rows show up in result.
b) If the bugrepro.patch is applied on top of PR PR-SPARK-51016 , then the BugTest will fail after one or two or more iterations, indicating the race condition in DataScheduler/Stage interaction.
c) But if the same BugTest is run on branch containing fix for this bug as well as the PR PR-SPARK-51016, it will pass in all the 100 iteration.
Was this patch authored or co-authored using generative AI tooling?
No