-
Notifications
You must be signed in to change notification settings - Fork 368
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1496] Differentiate map results with only different stageAttemptId #2609
Conversation
@@ -66,6 +66,26 @@ public class SparkShuffleManager implements ShuffleManager { | |||
private ExecutorShuffleIdTracker shuffleIdTracker = new ExecutorShuffleIdTracker(); | |||
|
|||
public SparkShuffleManager(SparkConf conf, boolean isDriver) { | |||
int maxStageAttempts = | |||
conf.getInt( | |||
"spark.stage.maxConsecutiveAttempts", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"spark.stage.maxConsecutiveAttempts" becomes a variable of config.package through this PR (apache/spark#42061), so all versions of spark2 and some versions of spark3 cannot get this config through the variable of config.package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When throwsFetchFailure
is enabled this should be handled - I would suggest setting celeborn.client.spark.fetch.throwsFetchFailure
to true
and trying.
That flag should fix this issue (as well as allow recomputation of lost shuffle data !).
If there are specific reasons why it cant be enabled (since it is still false
by default !) - I would suggest:
a) work with spark community to enforce this limit.
b) Once (a) is done, scope the change to when throwsFetchFailure
is false
.
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
Outdated
Show resolved
Hide resolved
2a9e963
to
79994ee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except minor change, thanks!
...nt-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
Outdated
Show resolved
Hide resolved
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
Outdated
Show resolved
Hide resolved
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
Outdated
Show resolved
Hide resolved
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
Outdated
Show resolved
Hide resolved
...nt-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
Outdated
Show resolved
Hide resolved
@@ -297,4 +297,45 @@ class CelebornFetchFailureSuite extends AnyFunSuite | |||
sparkSession.stop() | |||
} | |||
} | |||
|
|||
test("celeborn spark integration test - resubmit an unordered barrier stage") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This specific test will pass when we support barrier stages (even without the changes in this PR).
@RexXiong's changes did reproduce the issue - perhaps adapt it here ?
### What changes were proposed in this pull request? Adds support for barrier stages. This involves two aspects: a) If there is a task failure when executing a barrier stage, all shuffle output for the stage attempt are discarded and ignored. b) If there is a reexecution of a barrier stage (for ex, due to child stage getting a fetch failure), all shuffle output for the previous stage attempt are discarded and ignored. This is similar to handling of indeterminate stages when `throwsFetchFailure` is `true`. Note that this is supported only when `spark.celeborn.client.spark.fetch.throwsFetchFailure` is `true` ### Why are the changes needed? As detailed in CELEBORN-1518, Celeborn currently does not support barrier stages; which is an essential functionality in Apache Spark which is widely in use by Spark users. Enhancing Celeborn will allow its use for a wider set of Spark users. ### Does this PR introduce _any_ user-facing change? Adds ability for Celeborn to support Apache Spark Barrier stages. ### How was this patch tested? Existing tests, and additional tests (thanks to jiang13021 in #2609 - [see here](https://github.com/apache/celeborn/pull/2609/files#diff-e17f15fcca26ddfc412f0af159c784d72417b0f22598e1b1ebfcacd6d4c3ad35)) Closes #2639 from mridulm/fix-barrier-stage-reexecution. Lead-authored-by: Mridul Muralidharan <mridul@gmail.com> Co-authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Adds support for barrier stages. This involves two aspects: a) If there is a task failure when executing a barrier stage, all shuffle output for the stage attempt are discarded and ignored. b) If there is a reexecution of a barrier stage (for ex, due to child stage getting a fetch failure), all shuffle output for the previous stage attempt are discarded and ignored. This is similar to handling of indeterminate stages when `throwsFetchFailure` is `true`. Note that this is supported only when `spark.celeborn.client.spark.fetch.throwsFetchFailure` is `true` As detailed in CELEBORN-1518, Celeborn currently does not support barrier stages; which is an essential functionality in Apache Spark which is widely in use by Spark users. Enhancing Celeborn will allow its use for a wider set of Spark users. Adds ability for Celeborn to support Apache Spark Barrier stages. Existing tests, and additional tests (thanks to jiang13021 in #2609 - [see here](https://github.com/apache/celeborn/pull/2609/files#diff-e17f15fcca26ddfc412f0af159c784d72417b0f22598e1b1ebfcacd6d4c3ad35)) Closes #2639 from mridulm/fix-barrier-stage-reexecution. Lead-authored-by: Mridul Muralidharan <mridul@gmail.com> Co-authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 3234bef) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
069af3b
to
c0243d0
Compare
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
...k/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
Outdated
Show resolved
Hide resolved
…eleborn/CelebornShuffleManagerSuite.scala
...k/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
Outdated
Show resolved
Hide resolved
…eleborn/CelebornShuffleManagerSuite.scala
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2609 +/- ##
==========================================
- Coverage 39.83% 33.32% -6.50%
==========================================
Files 239 310 +71
Lines 15026 18219 +3193
Branches 1362 1673 +311
==========================================
+ Hits 5984 6070 +86
- Misses 8711 11809 +3098
- Partials 331 340 +9 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable to me.
+CC @waitinfuture and @RexXiong as well.
My only hesitation is whether we can generalize how we are handling testRandomPushForStageRerun
... it is hyper specific to that test.
} | ||
|
||
public static int getEncodedAttemptNumber(TaskContext context) { | ||
return (context.stageAttemptNumber() << 16) | context.attemptNumber(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed earlier (I cant seem to find the ref :-) ) - please do submit a PR to Apache Spark as well for this - and ensure the communities can align on this assumption.
if (testRandomPushForStageRerun && shuffleId == 0 && !alreadyReadChunk) { | ||
alreadyReadChunk = true; | ||
} else if (testRandomPushForStageRerun && shuffleId == 0 && alreadyReadChunk) { | ||
alreadyReadChunk = false; | ||
throw new CelebornIOException("already read chunk"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit:
if (testRandomPushForStageRerun && shuffleId == 0 && !alreadyReadChunk) { | |
alreadyReadChunk = true; | |
} else if (testRandomPushForStageRerun && shuffleId == 0 && alreadyReadChunk) { | |
alreadyReadChunk = false; | |
throw new CelebornIOException("already read chunk"); | |
} | |
if (testRandomPushForStageRerun) { | |
if (shuffleId == 0 && !alreadyReadChunk) { | |
alreadyReadChunk = true; | |
} else if (shuffleId == 0 && alreadyReadChunk) { | |
alreadyReadChunk = false; | |
throw new CelebornIOException("already read chunk"); | |
} | |
} |
Agree. IMO we should eliminate this test from the standard read/write code. We only need to ensure that a different attempt ID is used for shuffle writes, as Distinguishing the output data with different attempt IDs aligns with our previous approach. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks! Merge to main(v0.6.0) and branch-0.5(v0.5.2)
…temptId ### What changes were proposed in this pull request? Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId. ### Why are the changes needed? If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit Closes #2609 from jiang13021/spark_stage_attempt_id. Lead-authored-by: jiang13021 <jiangyanze.jyz@antgroup.com> Co-authored-by: Fu Chen <cfmcgrady@gmail.com> Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com> (cherry picked from commit 3853075) Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
…temptId Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId. If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue. No Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit Closes apache#2609 from jiang13021/spark_stage_attempt_id. Lead-authored-by: jiang13021 <jiangyanze.jyz@antgroup.com> Co-authored-by: Fu Chen <cfmcgrady@gmail.com> Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
…temptId ### What changes were proposed in this pull request? Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId. ### Why are the changes needed? If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit Closes apache#2609 from jiang13021/spark_stage_attempt_id. Lead-authored-by: jiang13021 <jiangyanze.jyz@antgroup.com> Co-authored-by: Fu Chen <cfmcgrady@gmail.com> Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
…ageAttemptId backport #2609 to branch-0.4 ### What changes were proposed in this pull request? Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId. ### Why are the changes needed? If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit Closes #2609 from jiang13021/spark_stage_attempt_id. Lead-authored-by: jiang13021 <jiangyanze.jyzantgroup.com> Closes #2717 from cfmcgrady/CELEBORN-1496-branch-0.4. Authored-by: jiang13021 <jiangyanze.jyz@antgroup.com> Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
### What changes were proposed in this pull request? Introduce `spark-3.5-columnar-shuffle` module to support columnar shuffle for Spark 3.5. Follow up #2710, #2609. ### Why are the changes needed? Tests of `CelebornColumnarShuffleReaderSuite` are failed for the changes of #2609. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `CelebornColumnarShuffleReaderSuite` Closes #2726 from SteNicholas/CELEBORN-912. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
…temptId ### What changes were proposed in this pull request? Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId. ### Why are the changes needed? If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit Closes apache#2609 from jiang13021/spark_stage_attempt_id. Lead-authored-by: jiang13021 <jiangyanze.jyz@antgroup.com> Co-authored-by: Fu Chen <cfmcgrady@gmail.com> Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request? Introduce `spark-3.5-columnar-shuffle` module to support columnar shuffle for Spark 3.5. Follow up apache#2710, apache#2609. ### Why are the changes needed? Tests of `CelebornColumnarShuffleReaderSuite` are failed for the changes of apache#2609. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `CelebornColumnarShuffleReaderSuite` Closes apache#2726 from SteNicholas/CELEBORN-912. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
### What changes were proposed in this pull request? Adds support for barrier stages. This involves two aspects: a) If there is a task failure when executing a barrier stage, all shuffle output for the stage attempt are discarded and ignored. b) If there is a reexecution of a barrier stage (for ex, due to child stage getting a fetch failure), all shuffle output for the previous stage attempt are discarded and ignored. This is similar to handling of indeterminate stages when `throwsFetchFailure` is `true`. Note that this is supported only when `spark.celeborn.client.spark.fetch.throwsFetchFailure` is `true` ### Why are the changes needed? As detailed in CELEBORN-1518, Celeborn currently does not support barrier stages; which is an essential functionality in Apache Spark which is widely in use by Spark users. Enhancing Celeborn will allow its use for a wider set of Spark users. ### Does this PR introduce _any_ user-facing change? Adds ability for Celeborn to support Apache Spark Barrier stages. ### How was this patch tested? Existing tests, and additional tests (thanks to jiang13021 in apache#2609 - [see here](https://github.com/apache/celeborn/pull/2609/files#diff-e17f15fcca26ddfc412f0af159c784d72417b0f22598e1b1ebfcacd6d4c3ad35)) Closes apache#2639 from mridulm/fix-barrier-stage-reexecution. Lead-authored-by: Mridul Muralidharan <mridul@gmail.com> Co-authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
…temptId ### What changes were proposed in this pull request? Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId. ### Why are the changes needed? If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit Closes apache#2609 from jiang13021/spark_stage_attempt_id. Lead-authored-by: jiang13021 <jiangyanze.jyz@antgroup.com> Co-authored-by: Fu Chen <cfmcgrady@gmail.com> Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request? Introduce `spark-3.5-columnar-shuffle` module to support columnar shuffle for Spark 3.5. Follow up apache#2710, apache#2609. ### Why are the changes needed? Tests of `CelebornColumnarShuffleReaderSuite` are failed for the changes of apache#2609. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `CelebornColumnarShuffleReaderSuite` Closes apache#2726 from SteNicholas/CELEBORN-912. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
What changes were proposed in this pull request?
Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId.
Why are the changes needed?
If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit