Skip to content

[SPARK-51272][CORE]. Fix for the race condition in Scheduler causing failure in retrying all partitions in case of indeterministic shuffle keys #50033

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 43 commits into
base: master
Choose a base branch
from

Conversation

ahshahid
Copy link

@ahshahid ahshahid commented Feb 21, 2025

What changes were proposed in this pull request?

In the DagScheduler code , a race condition exists, due to concurrency in addition of DagScheduler events in the EventLoop Queue.

Changeset 1
In case of a FetchFailure for an indeterministic task, there is a window where before ResubmitFailedStages is added to the event queue for processing, a successful partition task completion event can get added to the EventQueue , before ResubmitFailedStages event.
The code change is to mark the stage such that it treats all partitions as missing, just before submitting the ResubmitFailedStages event to the queue.
So while creating a fresh retry attempt for stage, the missing partitions will be assumed to be all the partitions, even if a partition task completes successfully in that window between submission of the ResubmitFailedStages and retry of stage.

Once the retry attempt number increases, then the mark on stage can be removed.

With the above changes, it turns out that two existing tests have wrong assertions , IMO, as all partitions are not subjected to be retried.

Changeset 2
In case a ResultStage 's first partition task fails with a FetchFailure and if the ResultStage is dependent on 2 ShuffleStages, such that ShuffleStage1 is Determinate , while ShuffleStage2 is InDeterminate. If the FetchFailure corresponds to ShuffleStage 1( the Determinate ) , even then the ResultStage should retry all the partitions and do not accept any incoming successful partition for the failure attempt. It is because though the failing ShuffleStage is determinate, there is no guarantee at this point, that the ShuffleStage2 which is inDeterminate has also failed or not. If that has failed and a successful task corresponding to a partition using the failed attempt stage comes in, it is going to create data corruption as , inDeterminate stage will get rexecuted.

Why are the changes needed?

There is a race condition, where a successful task completion concurrent with a task failure , for an inDeterminate stage, results in a situation , where instead of re-executing all partitions, only some are retried. This results in data loss.
The race condition identified is as follows:
a) A successful result stage task, is yet to mark in the boolean array tracking partitions success/failure as true/false.
b) A concurrent failed result task, belonging to an InDeterminate stage, idenitfies all the stages which needs/ can be rolled back. For Result Stage, it looks into the array of successful partitions. As none is marked as true, the ResultStage and dependent stages are delegated to thread pool for retry.
c) Between the time of collecting stages to rollback and re-try of stages, the successful task marks boolean as true.
d) The Retry of Stage, as a result, misses the partition marked as successful, for retry.

The above race condition also exists for the case where the ResultStage is InDeterminate, but the shuffle stage failing is Determinate and ResultStage depends on both the Shuffle Stages.

An existing test (core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala) , has incorrect assertions regarding the number of partitions being retried , for an inDeterminate stage.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit test reproducing the race condition.
Have a functional test which exposes following:
Data Loss due to buggy Stage.isInDeterminate SPARK-51016
Race condition causing Data Loss , even if above bug's PR is taken.
Have added a functional HA Test which is disabled as of now, and it will pass in all iterations only IFF this PR and PR corresponding to issue https://issues.apache.org/jira/browse/SPARK-51016
,ie #50029 is also taken.

I have created a test PR which combines the changes in this PR and PR for SPARK-51016 , and has the SparkHASuite's test enabled, for reference.
spark-51016 -51272-combined-with-HA-test-enabled

The functional issue with race condition can also be reproduced using a single VM junit test but that requires source code modification. For that ,
Attaching two files for reproducing the functional bug , showing the race condition causing data corruption.

I am attaching 2 files for bug test
bugrepro.patch
BugTest.txt

This is needed to coax the single VM test to reproduce the issue. It has lots of interception and tweaks to ensure that system is able to hit the data loss situation.
( like each partition writes only a shuffle file containing keys evaluating to same hashCode and deleting the shuffle file at right time etc)
The BugTest itself.
a) If the bugrepro.patch is applied to current master and the BugTest run, it will fail immediately with assertion failure where instead of 12 rows, 6 rows show up in result.

b) If the bugrepro.patch is applied on top of PR PR-SPARK-51016 , then the BugTest will fail after one or two or more iterations, indicating the race condition in DataScheduler/Stage interaction.

c) But if the same BugTest is run on branch containing fix for this bug as well as the PR PR-SPARK-51016, it will pass in all the 100 iteration.

Was this patch authored or co-authored using generative AI tooling?

No

…in retrying all partitions in case of indeterministic shuffle keys
@github-actions github-actions bot added the CORE label Feb 21, 2025
@ahshahid
Copy link
Author

I will try to write a unit test which demonstrates race condition, which can be part of the checkin..

@ahshahid ahshahid changed the title SPARK-51272. Fix for the race condition in Scheduler causing failure … [SPARK-51272][CORE]. Fix for the race condition in Scheduler causing failure in retrying all partitions in case of indeterministic shuffle keys Feb 21, 2025
…on which will fail if all partitions are not retried for indeterministic shuffle stage
Copy link
Contributor

@sririshindra sririshindra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a very rudimentary pass, I will go through it more thoroughly. Please remove the TODOs you added to the PR.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided unittest is invalid as it uses multiple threads to process events meanwhile the production code has a single dedicated thread so a race condition is not possible.

If you still think it is a race condition and you can reproduce the problem with a standalone test I suggest to add log lines two those places where you think the two threads are competing and use the "%t" formatter in log4j2 to include the thread names in the log. In this case please attach the reproduction code without any production code change (only the new logging lines should be added but it is fine if the reproduction should be retried 1000 times as race conditions are flaky in nature but I prefer the original production code) and attach the section of the logs where you think the race occurs.

@ahshahid
Copy link
Author

ahshahid commented Mar 3, 2025

The provided unittest is invalid as it uses multiple threads to process events meanwhile the production code has a single dedicated thread so a race condition is not possible. ( apart from creating the right env. for FetchFailedException, the source code attempts to stop executor on the host in case of Exception, so without source code changes, end to end reproduction in single VM is very difficult for me).

If you still think it is a race condition and you can reproduce the problem with a standalone test I suggest to add log lines two those places where you think the two threads are competing and use the "%t" formatter in log4j2 to include the thread names in the log. In this case please attach the reproduction code without any production code change (only the new logging lines should be added but it is fine if the reproduction should be retried 1000 times as race conditions are flaky in nature but I prefer the original production code) and attach the section of the logs where you think the race occurs.

The end to end bug reproduction without product code change is not feasible for me ( atleast at this point), in a single VM unit test.

The race condition is possible ( and happens) because in the DagScheduler ::handleTaskCompletion method, there is asynchronicity introduced due to following snippet of code
```
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
},
DAGScheduler.RESUBMIT_TIMEOUT,
TimeUnit.MILLISECONDS
)

@ahshahid
Copy link
Author

ahshahid commented Mar 3, 2025

Let me see if I can modify the test and expose the race, without using multiple threads at top level..

@attilapiros
Copy link
Contributor

The provided unittest is invalid as it uses multiple threads to process events meanwhile the production code has a single dedicated thread so a race condition is not possible. ( apart from creating the right env. for FetchFailedException, the source code attempts to stop executor on the host in case of Exception, so without source code changes, end to end reproduction in single VM is very difficult for me).
If you still think it is a race condition and you can reproduce the problem with a standalone test I suggest to add log lines two those places where you think the two threads are competing and use the "%t" formatter in log4j2 to include the thread names in the log. In this case please attach the reproduction code without any production code change (only the new logging lines should be added but it is fine if the reproduction should be retried 1000 times as race conditions are flaky in nature but I prefer the original production code) and attach the section of the logs where you think the race occurs.

The end to end bug reproduction without product code change is not feasible for me ( atleast at this point), in a single VM unit test.

The race condition is possible ( and happens) because in the DagScheduler ::handleTaskCompletion method, there is asynchronicity introduced due to following snippet of code

messageScheduler.schedule(
    new Runnable {
      override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
    },
    DAGScheduler.RESUBMIT_TIMEOUT,
    TimeUnit.MILLISECONDS
  )

Here what we have is producer-consumer pattern.

The messageScheduler is a queue connecting the producer(s) to the consumer. Regarding the producer side we are fine you can post from any number of threads (you can have multiple producers) but the processing of those event (the consumer side) is in question, if you assume the race condition is in one or multiple handle method calls where you introduced the locking (such as handleTaskCompletion which is called from

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
method via
override def onReceive(event: DAGSchedulerEvent): Unit = {
) but that is single threaded because DAGSchedulerEventProcessLoop extends the EventLoop where the onReceive() method is called from a single thread:

private[spark] val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
} catch {
case NonFatal(e) =>
try {
onError(e)
} catch {
case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, name)}", e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, name)}", e)
}
}
}

The locking implementation itself also contains some bugs but first let's focus on understanding the problem at hand.

… relying on concurrent processing of DagEvent, instead relying on concurrent addition of event to the event loop queue
@ahshahid
Copy link
Author

ahshahid commented Mar 4, 2025

The provided unittest is invalid as it uses multiple threads to process events meanwhile the production code has a single dedicated thread so a race condition is not possible. ( apart from creating the right env. for FetchFailedException, the source code attempts to stop executor on the host in case of Exception, so without source code changes, end to end reproduction in single VM is very difficult for me).
If you still think it is a race condition and you can reproduce the problem with a standalone test I suggest to add log lines two those places where you think the two threads are competing and use the "%t" formatter in log4j2 to include the thread names in the log. In this case please attach the reproduction code without any production code change (only the new logging lines should be added but it is fine if the reproduction should be retried 1000 times as race conditions are flaky in nature but I prefer the original production code) and attach the section of the logs where you think the race occurs.

The end to end bug reproduction without product code change is not feasible for me ( atleast at this point), in a single VM unit test.
The race condition is possible ( and happens) because in the DagScheduler ::handleTaskCompletion method, there is asynchronicity introduced due to following snippet of code

messageScheduler.schedule(
    new Runnable {
      override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
    },
    DAGScheduler.RESUBMIT_TIMEOUT,
    TimeUnit.MILLISECONDS
  )

Here what we have is producer-consumer pattern.

The messageScheduler is a queue connecting the producer(s) to the consumer. Regarding the producer side we are fine you can post from any number of threads (you can have multiple producers) but the processing of those event (the consumer side) is in question, if you assume the race condition is in one or multiple handle method calls where you introduced the locking (such as handleTaskCompletion which is called from

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

method via

override def onReceive(event: DAGSchedulerEvent): Unit = {

) but that is single threaded because DAGSchedulerEventProcessLoop extends the EventLoop where the onReceive() method is called from a single thread:

private[spark] val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
} catch {
case NonFatal(e) =>
try {
onError(e)
} catch {
case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, name)}", e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, name)}", e)
}
}
}

The locking implementation itself also contains some bugs but first let's focus on understanding the problem at hand.

@attilapiros : Reworked the test to reproduce the race condition by relying on the behaviour of concurrency in the addition of the DagEvent in the Event Loop's queue.
To reproduce the race condition , following is what test does:

  1. The FetchFailed event, before submitting the event "ResubmitFailedStages", ensures that the queue gets a Partition Task completion event.
  2. This will ensure that first the partition task completion event is completed, then the ResubmitFailedStages event is submitted.

The test reliably fails in current master.

Also curious to know the issues you found with Locks usage.

@ahshahid ahshahid requested a review from attilapiros March 4, 2025 19:18
@ahshahid
Copy link
Author

ahshahid commented Mar 4, 2025

@attilapiros @squito ,
Given that there is a guarantee that DagScheduler::onReceive(event: DAGSchedulerEvent) is always going to be invoked in single thread of EventLoop and in NO SITUATION can there be conurrency in invocation of onReceive(event: DAGSchedulerEvent) , then I suppose the race can be fixed with the existing changes of PR sans the locks.
I will modify the PR to remove locks altogether.

@ahshahid ahshahid changed the title [SPARK-51272][CORE]. Fix for the race condition in Scheduler causing failure in retrying all partitions in case of indeterministic shuffle keys [WIP][SPARK-51272][CORE]. Fix for the race condition in Scheduler causing failure in retrying all partitions in case of indeterministic shuffle keys Mar 4, 2025
ashahid added 2 commits March 4, 2025 13:50
… always be in a single thread of EventLoop and that when ResubmitFailedStages event is submitted, though it is done by MessageScheduler ( a separate threadpool, but it is still doing post to the event loop's queue instead of direct onRecieve invocation, the read/write locks in stage are not needed
@ahshahid ahshahid changed the title [WIP][SPARK-51272][CORE]. Fix for the race condition in Scheduler causing failure in retrying all partitions in case of indeterministic shuffle keys [SPARK-51272][CORE]. Fix for the race condition in Scheduler causing failure in retrying all partitions in case of indeterministic shuffle keys Mar 4, 2025
@attilapiros
Copy link
Contributor

@ahshahid as the solution became very different please update its description too in the PR description.

@ahshahid
Copy link
Author

ahshahid commented Mar 6, 2025

@ahshahid as the solution became very different please update its description too in the PR description.

@attilapiros .Sincere thanks for the detailed review.

@ahshahid
Copy link
Author

ahshahid commented Apr 7, 2025

@attilapiros : I am also not sure when you said :
"But our solution is not enough as the ResultStage does not support revert/rollback so re-executing all the tasks may lead to incorrect result!

There is another issue created for this, please check out https://issues.apache.org/jira/browse/SPARK-25342."

Are you refering to case 1 or case 2?

I have explained why in case of Case1, the spurious result will not get committed.

so does this mean it answers the above statement of yours?

@attilapiros
Copy link
Contributor

I focused on your "SPARK-51272: retry all the partitions of result stage, if the first result task has failed and failing ShuffleMap stage is inDeterminate" test where the current solution is not enough as we need a proper revert.

What I am looking for is a simpler solution could you please look into this:
https://github.com/attilapiros/spark/pull/8/files?diff=unified&w=1

This is PR against my repo and it has two failed tests but I am interested about your early opinion.

@attilapiros
Copy link
Contributor

attilapiros commented Apr 8, 2025

Those two failed tests in my PR were coming the DAGSchedulerSuite changes so I reverted those.

@ahshahid
Copy link
Author

ahshahid commented Apr 8, 2025 via email

@attilapiros
Copy link
Contributor

Yes, they are correct.

That stage has two tasks one is running on hostC and the other is on hostD:

completeShuffleMapStageSuccessfully(1, 0, 2, Seq("hostC", "hostD"))

The fetch failure was from the host called hostC:

FetchFailed(makeBlockManagerId("hostC"), shuffleId2, 0L, 0, 0, "ignored"),

This caused a executor lost on hostC:

25/04/07 19:48:08.769 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Executor lost: hostC-exec (epoch 4)

So this removes the output which was made on hostC. This is how we get the assert right but latter when the ResubmitFailedStages is handled the execution goes to submitMissingTasks() where all the output is removed:

mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)

@ahshahid
Copy link
Author

ahshahid commented Apr 8, 2025 via email

@attilapiros
Copy link
Contributor

What about the retry of partitions?. Because the failing stage is
indeterminate, all the partitions should be retired.. isn't it?

...but latter when the ResubmitFailedStages is handled the execution goes to submitMissingTasks() where all the output is removed:

mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)

@ahshahid
Copy link
Author

ahshahid commented Apr 8, 2025 via email

@attilapiros
Copy link
Contributor

Please checkout https://github.com/attilapiros/spark/pull/8/files?diff=unified&w=1

Ignoring the whitespaces the production code is just 13 lines added to DAGScheduler and there is no new state introduced.
So the change is much simpler and easier to reason about.

@ahshahid
Copy link
Author

ahshahid commented Apr 8, 2025 via email

@ahshahid
Copy link
Author

ahshahid commented Apr 8, 2025 via email

@attilapiros
Copy link
Contributor

No even in the simplified I just abort the stage when a revert would be needed.

@ahshahid
Copy link
Author

ahshahid commented Apr 8, 2025 via email

@ahshahid
Copy link
Author

ahshahid commented Apr 8, 2025 via email

@attilapiros
Copy link
Contributor

If say it's the first result task which has failed and is being processed
by event process thread, in this simplifed PR, will there be a window /
situation , where the query will be aborted?
I guess, yes?

I do not see any window there but you can add a test if you suspect any.

@ahshahid
Copy link
Author

ahshahid commented Apr 8, 2025

@attilapiros .. added a comment on your private PR

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a quick look - will do a more indepth analysis later.
I want to make sure I am not missing anything here.

}
} else {
mapOutputTracker.unregisterAllMapAndMergeOutput(
mapStage.shuffleDep.shuffleId)
Copy link
Contributor

@mridulm mridulm Apr 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add if/else for SHUFFLE_USE_OLD_FETCH_PROTOCOL.
Orthogonally, given we are in 4.0, perhaps it is time to drop SHUFFLE_USE_OLD_FETCH_PROTOCOL. Thoughts @attilapiros ?

}

case resultStage: ResultStage if resultStage.activeJob.isDefined =>
val numMissingPartitions = resultStage.findMissingPartitions().length
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
} else {
resultStage.markAllPartitionsMissing()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Here and in other places) If ResultStage does not have any missing tasks - why are we failing it ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are at this line, as per my understanding, because the current stage ( result stage) , its first task has failed. And the ResultStage is directly or indirectly dependent on an inDeterminate stage ( that is ResultStage is an inDeterminate type of stage). The function call resultStage.markAllPartitionsMissing(), sets the flag for this stage , such that while this stage is being resubmitted for re-execution ( a separate thread adds it back to the event queue), in that window , if any successful task comes for some other partition , it should be rejected. If it gets added , then that is the race. As it would other wise have resulted in refetch of some partitions ( & not all).
This flag is checked at
line : 1881.
&& stage.isIndeterminate) || stage.shouldDiscardResult(task.stageAttemptId)

resultStage.markAllPartitionsMissing()
}

case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failedStage has already been marked as failed.

Given mapStage is not indeterminate, but only failedStage (the 'reducer' stage which is a ResultStage) is - we only need to recompute the failed partitions for it, not all partitions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case, where ResultStage is failing due to the first task. The ShuffleID which caused the result stage to fail is Determinate. But the Result Stage ( in case of Join) is dependent on two shuffle stages, where the other shuffle stage is inDeterminate. ( which makes the ResultStage also inDeterminate).
Now at this point its not known whether the other inDeterminate shuffle stage has also failed or not.
Assuming that if it has also failed, and one of the result task is marked successful or if not all partitions are retried, then we will get wrong results ( the commented HA test can fail sporadically due to this problem).
So in this case also, the behaviour needs to be the same as the previous one ( where the failing Shuffle Stage is inDeterminate).

}

override def shouldDiscardResult(attemptId: Int): Boolean =
this.discardResultsForAttemptId >= attemptId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming both usecases for markAllPartitionsMissing are not required (based on discussion above), we can remove these changes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we decide to aggressively abort the query, even if its the first task which is failing ( & resultStage is inDeterminate), this piece of code and the two above, are needed , IMO to avoid the race condition.

@attilapiros
Copy link
Contributor

@mridulm IMHO regarding an indeterministic result stage we should abort the stage more aggressively as we cannot re-execute any of its tasks twice as on the executor side repeating the operation with different data can lead to corrupted results.

One example is the using FileOutputCommitter. Here I cannot see there are any guarantees of re-execution a Hadoop Task commit with different data.

The other good/better example is writing to an external DB via JDBC:

Here you can see it iterates over on the partitions and calls INSERT INTOs:

repartitionedDF.foreachPartition { iterator => savePartition(
table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, options)
}

where one INSERT INTO is SQL dialect specific:

val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)

So any re-execute will duplicate the data.

I think this is why https://issues.apache.org/jira/browse/SPARK-25342 is opened.

@mridulm WDYT?

@mridulm
Copy link
Contributor

mridulm commented Apr 13, 2025

@attilapiros I am not entirely sure about specific context of your comment :-)

If you mean this or this, then that is no different from how speculative tasks for indeterminate stage(s) behave : if all partitions had completed and committed, they dont need to be recomputed.

For the first, map stage is determinate - so reexecution will not change input data for 'reducer' (though can change order) - same as spec execution for a partition of this result stage.
For the second, the result stage has no missing partitions - so all tasks have completed successfully (similar to one of the two tasks completing successfully for spec exec and committing) : I will need to see practically when this can actually happen (as the job should have been removed from active job list due to result stage completion ... not sure if there is some corner case race condition)

SPARK-25342 handles the case where a stage needs to be reexecuted, which is indeterminate - and so impacts the dag of all child stages.

Do let me know if I am missing something !

(Note: Edited for clarity)

@attilapiros
Copy link
Contributor

attilapiros commented Apr 13, 2025

For the #50033 (comment), map stage is determinate - so reexecution will not change input data for 'reducer' (though can change order) - same as spec execution for a partition of this result stage.

Even if the map stage determinate a fetch failure will lead to executor lost which can remove map output of the indeterminate stage and when the result stage is resubmitted the indeterminate parent will be detected as missing and will be resubmitted.

So when we have 3 stages:

  • ShuffleMapStage1 (hostA_exec, hostB_exec), determinate
  • ShuffleMapStage2 (hostA_exec, hostB_exec), indeterminate
  • ResultStage depending on ShuffleMapStage1 and ShuffleMapStage2

A FetchFailure when ResultStage is fetching from ShuffleMapStage1 will lead to failing both ShuffleMapStage1 and ResultStage and even removing the executor so removing the map output as well. So when ResultStage is resubmitted its parent ShuffleMapStage2 will be missing too.

I believe this is tested as

test("SPARK-51272: retry all the partitions of result stage, if the first result task" +
" has failed and failing ShuffleMap stage is inDeterminate") {
this.dagSchedulerInterceptor = createDagInterceptorForSpark51272(
() => taskSets.find(_.shuffleId.isEmpty).get.tasks(1), "RELEASE_LATCH")
val numPartitions = 2
// The first shuffle stage is completed by the below function itself which creates two
// stages.
val (shuffleId1, shuffleId2) = constructTwoStages(
stage1InDeterminate = false,
stage2InDeterminate = true,
isDependencyBetweenStagesTransitive = false)
val shuffleStage1 = this.scheduler.shuffleIdToMapStage(shuffleId1)
val shuffleStage2 = this.scheduler.shuffleIdToMapStage(shuffleId2)
completeShuffleMapStageSuccessfully(shuffleStage2.id, 0, numPartitions)
val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage]
val activeJob = resultStage.activeJob
assert(activeJob.isDefined)
// The result stage is still waiting for its 2 tasks to complete
assert(resultStage.findMissingPartitions() == Seq.tabulate(numPartitions)(i => i))
// The below event is going to initiate the retry of previous indeterminate stages, and also
// the retry of all result tasks. But before the "ResubmitFailedStages" event is added to the
// queue of Scheduler, a successful completion of the result partition task is added to the
// event queue. Due to scenario, the bug surfaces where instead of retry of all partitions
// of result tasks (2 tasks in total), only some (1 task) get retried
runEvent(
makeCompletionEvent(
taskSets.find(_.stageId == resultStage.id).get.tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0, "ignored"),
null))
import org.scalatest.concurrent.Eventually._
import org.scalatest.matchers.should.Matchers._
import org.scalatest.time.SpanSugar._
eventually(timeout(3.minutes), interval(500.milliseconds)) {
shuffleStage1.latestInfo.attemptNumber() should equal(1)
}
completeShuffleMapStageSuccessfully(shuffleStage1.id, 1, numPartitions)
eventually(timeout(3.minutes), interval(500.milliseconds)) {
shuffleStage2.latestInfo.attemptNumber() should equal(1)
}
completeShuffleMapStageSuccessfully(shuffleStage2.id, 1, numPartitions)
eventually(timeout(3.minutes), interval(500.milliseconds)) {
resultStage.latestInfo.attemptNumber() should equal(1)
}
org.scalatest.Assertions.assert(resultStage.latestInfo.numTasks == numPartitions)
org.scalatest.Assertions.assert(resultStage.findMissingPartitions().size == numPartitions)
}

Despite the test name says " failing ShuffleMap stage is inDeterminate" the shuffleId1 used at the fetch failure belongs to the determinate stage.

If you #50033 (comment) or #50033 (comment), then that is no different from how speculative tasks for indeterminate stage(s) behave : if all partitions had completed and committed, they dont need to be recomputed

Can it be in this case even speculative tasks are leading to errors for the writing to JDBC? As repeated insert intos
in the best case (when the data is the same) it simply duplicates the data. If this is documented its fine as with primary keys at the target schema this can be detected.

@mridulm
Copy link
Contributor

mridulm commented Apr 14, 2025

Even if the map stage determinate a fetch failure will lead to executor lost which can remove map output of the indeterminate stage and when the result stage is resubmitted the indeterminate parent will be detected as missing and will be resubmitted.

The indeterminate stage here is a ResultStage - so the scenario described does not apply (no shuffle output).
We do need to ensure committed task output is handled properly , which is similar to the case of spec exec/node loss/task reexecution due to non-shuffle fetch failures/...

For the case of shuffle map stage, submitMissingTasks does clear the shuffle output of stage when it is a new attempt is resubmitted (the first match statement).

With that out of the way, let us look at the scenario described:

So when we have 3 stages:

  • ShuffleMapStage1 (hostA_exec, hostB_exec), determinate
  • ShuffleMapStage2 (hostA_exec, hostB_exec), indeterminate
  • ResultStage depending on ShuffleMapStage1 and ShuffleMapStage2

A FetchFailure when ResultStage is fetching from ShuffleMapStage1 will lead to failing both ShuffleMapStage1 and ResultStage and even removing the executor so removing the map output as well. So when ResultStage is resubmitted its parent ShuffleMapStage2 will be missing too.

There are bunch of cases here, and we will need to analyze their impact. I am focussing on two main cases:

For the common case, when ResultStage is reexecuted, it will result in FetchFailure when fetching output of ShuffleMapStage2 - which then results in aborting the job by the existing fetch failure handling code (this is boiling down to the simple case of existing indeterminate shuffle-map-stage -> result-stage, with result-stage having completed tasks).

Having said that:

I will need to recheck if handleExecutorLost handles impact on indeterminate stage properly : but from a cursory read, it is likely that the scenario you described @attilapiros is not handled there ?

That is, if fileLost == true, we might want to do something similar to what we are doing in handleTaskCompletion when there is a fetch failure ?
If yes, we could be more aggressive when handling this case -

  • Invalidate all downstream shuffle output
  • Any result stage which has/had started, and not completed - fail that job.

Does this align with your observations/analysis @attilapiros ?

@mridulm
Copy link
Contributor

mridulm commented Apr 14, 2025

Specifically about JDBC - assuming it is not due to the case we discussed above - I am not entirely sure :-)
If the commit protocol has been correctly implemented, we will need to understand that better ...

@attilapiros
Copy link
Contributor

If yes, we could be more aggressive when handling this case -

Invalidate all downstream shuffle output
Any result stage which has/had started, and not completed - fail that job.
Does this align with your observations/analysis @attilapiros ?

We are getting closer.

Specifically about JDBC - assuming it is not due to the case we discussed above - I am not entirely sure :-)
If the commit protocol has been correctly implemented, we will need to understand that better ...

There is transaction management for writing the rows of a partition (so for a task):

if (supportsTransactions) {
conn.setAutoCommit(false) // Everything in the same db transaction.
conn.setTransactionIsolation(finalIsolationLevel)
}

But the re-execution of a task will do the duplication as I see.

@mridulm
Copy link
Contributor

mridulm commented Apr 15, 2025

Just checked the JDBC integration you referenced @attilapiros, the issue is we are always committing when all rows are done processing. Instead, we should commit the txn only as part of the commit protocol - when driver lets the task know it can commit.

This will be an issue even without any determinism issue, for example when there is speculative execution - as a race condition.

@attilapiros
Copy link
Contributor

@mridulm

This will be an issue even without any determinism issue, for example when there is speculative execution - as a race condition.

But do we want one more situation where it breaks?

And my other concern is the Hadoop's file output committer I cannot see any guarantees what will happen if a Hadoop task is re-commited with different data (because of indeterminism we might have different data).

… for each of shuffle stages so that stages are not unnecessarily retried because of shared host names
@mridulm
Copy link
Contributor

mridulm commented Apr 16, 2025

@attilapiros the way to handle this in spark is to leverage output committer/commit protocol, based on what is described in your observations and analysis.
There are a number of fault modes we get exposed to if we do not adhere to it - including assumptions Spark scheduler makes.

And my other concern is the Hadoop's file output committer I cannot see any guarantees what will happen if a Hadoop task is re-commited with different data (because of indeterminism we might have different data).

You cannot 're-commit' with different data : this is why result stage is failed in case of indeterminism

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants