Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1496] Differentiate map results with only different stageAttemptId #2609

Closed
wants to merge 13 commits into from

Conversation

jiang13021
Copy link
Contributor

What changes were proposed in this pull request?

Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId.

Why are the changes needed?

If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit

@@ -66,6 +66,26 @@ public class SparkShuffleManager implements ShuffleManager {
private ExecutorShuffleIdTracker shuffleIdTracker = new ExecutorShuffleIdTracker();

public SparkShuffleManager(SparkConf conf, boolean isDriver) {
int maxStageAttempts =
conf.getInt(
"spark.stage.maxConsecutiveAttempts",
Copy link
Contributor Author

@jiang13021 jiang13021 Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"spark.stage.maxConsecutiveAttempts" becomes a variable of config.package through this PR (apache/spark#42061), so all versions of spark2 and some versions of spark3 cannot get this config through the variable of config.package

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When throwsFetchFailure is enabled this should be handled - I would suggest setting celeborn.client.spark.fetch.throwsFetchFailure to true and trying.

That flag should fix this issue (as well as allow recomputation of lost shuffle data !).

If there are specific reasons why it cant be enabled (since it is still false by default !) - I would suggest:
a) work with spark community to enforce this limit.
b) Once (a) is done, scope the change to when throwsFetchFailure is false.

Copy link
Contributor

@waitinfuture waitinfuture left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except minor change, thanks!

@@ -297,4 +297,45 @@ class CelebornFetchFailureSuite extends AnyFunSuite
sparkSession.stop()
}
}

test("celeborn spark integration test - resubmit an unordered barrier stage") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This specific test will pass when we support barrier stages (even without the changes in this PR).
@RexXiong's changes did reproduce the issue - perhaps adapt it here ?

waitinfuture pushed a commit that referenced this pull request Aug 12, 2024
### What changes were proposed in this pull request?

Adds support for barrier stages.
This involves two aspects:
a) If there is a task failure when executing a barrier stage, all shuffle output for the stage attempt are discarded and ignored.
b) If there is a reexecution of a barrier stage (for ex, due to child stage getting a fetch failure), all shuffle output for the previous stage attempt are discarded and ignored.

This is similar to handling of indeterminate stages when `throwsFetchFailure` is `true`.

Note that this is supported only when `spark.celeborn.client.spark.fetch.throwsFetchFailure` is `true`

### Why are the changes needed?

As detailed in CELEBORN-1518, Celeborn currently does not support barrier stages; which is an essential functionality in Apache Spark which is widely in use by Spark users.
Enhancing Celeborn will allow its use for a wider set of Spark users.

### Does this PR introduce _any_ user-facing change?

Adds ability for Celeborn to support Apache Spark Barrier stages.

### How was this patch tested?

Existing tests, and additional tests (thanks to jiang13021 in #2609 - [see here](https://github.com/apache/celeborn/pull/2609/files#diff-e17f15fcca26ddfc412f0af159c784d72417b0f22598e1b1ebfcacd6d4c3ad35))

Closes #2639 from mridulm/fix-barrier-stage-reexecution.

Lead-authored-by: Mridul Muralidharan <mridul@gmail.com>
Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
waitinfuture pushed a commit that referenced this pull request Aug 12, 2024
Adds support for barrier stages.
This involves two aspects:
a) If there is a task failure when executing a barrier stage, all shuffle output for the stage attempt are discarded and ignored.
b) If there is a reexecution of a barrier stage (for ex, due to child stage getting a fetch failure), all shuffle output for the previous stage attempt are discarded and ignored.

This is similar to handling of indeterminate stages when `throwsFetchFailure` is `true`.

Note that this is supported only when `spark.celeborn.client.spark.fetch.throwsFetchFailure` is `true`

As detailed in CELEBORN-1518, Celeborn currently does not support barrier stages; which is an essential functionality in Apache Spark which is widely in use by Spark users.
Enhancing Celeborn will allow its use for a wider set of Spark users.

Adds ability for Celeborn to support Apache Spark Barrier stages.

Existing tests, and additional tests (thanks to jiang13021 in #2609 - [see here](https://github.com/apache/celeborn/pull/2609/files#diff-e17f15fcca26ddfc412f0af159c784d72417b0f22598e1b1ebfcacd6d4c3ad35))

Closes #2639 from mridulm/fix-barrier-stage-reexecution.

Lead-authored-by: Mridul Muralidharan <mridul@gmail.com>
Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
(cherry picked from commit 3234bef)
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
@jiang13021 jiang13021 force-pushed the spark_stage_attempt_id branch from 069af3b to c0243d0 Compare August 12, 2024 07:44
Copy link

codecov bot commented Aug 12, 2024

Codecov Report

Attention: Patch coverage is 83.33333% with 1 line in your changes missing coverage. Please review.

Project coverage is 33.32%. Comparing base (ea6617c) to head (b0ac8a7).
Report is 22 commits behind head on main.

Files Patch % Lines
...cala/org/apache/celeborn/common/CelebornConf.scala 83.34% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2609      +/-   ##
==========================================
- Coverage   39.83%   33.32%   -6.50%     
==========================================
  Files         239      310      +71     
  Lines       15026    18219    +3193     
  Branches     1362     1673     +311     
==========================================
+ Hits         5984     6070      +86     
- Misses       8711    11809    +3098     
- Partials      331      340       +9     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me.

+CC @waitinfuture and @RexXiong as well.

My only hesitation is whether we can generalize how we are handling testRandomPushForStageRerun ... it is hyper specific to that test.

}

public static int getEncodedAttemptNumber(TaskContext context) {
return (context.stageAttemptNumber() << 16) | context.attemptNumber();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed earlier (I cant seem to find the ref :-) ) - please do submit a PR to Apache Spark as well for this - and ensure the communities can align on this assumption.

Comment on lines 396 to 401
if (testRandomPushForStageRerun && shuffleId == 0 && !alreadyReadChunk) {
alreadyReadChunk = true;
} else if (testRandomPushForStageRerun && shuffleId == 0 && alreadyReadChunk) {
alreadyReadChunk = false;
throw new CelebornIOException("already read chunk");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit:

Suggested change
if (testRandomPushForStageRerun && shuffleId == 0 && !alreadyReadChunk) {
alreadyReadChunk = true;
} else if (testRandomPushForStageRerun && shuffleId == 0 && alreadyReadChunk) {
alreadyReadChunk = false;
throw new CelebornIOException("already read chunk");
}
if (testRandomPushForStageRerun) {
if (shuffleId == 0 && !alreadyReadChunk) {
alreadyReadChunk = true;
} else if (shuffleId == 0 && alreadyReadChunk) {
alreadyReadChunk = false;
throw new CelebornIOException("already read chunk");
}
}

@RexXiong
Copy link
Contributor

Looks reasonable to me.

+CC @waitinfuture and @RexXiong as well.

My only hesitation is whether we can generalize how we are handling testRandomPushForStageRerun ... it is hyper specific to that test.

Agree. IMO we should eliminate this test from the standard read/write code. We only need to ensure that a different attempt ID is used for shuffle writes, as Distinguishing the output data with different attempt IDs aligns with our previous approach.

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks! Merge to main(v0.6.0) and branch-0.5(v0.5.2)

@RexXiong RexXiong closed this in 3853075 Aug 30, 2024
RexXiong added a commit that referenced this pull request Aug 30, 2024
…temptId

### What changes were proposed in this pull request?
Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId.

### Why are the changes needed?
If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit

Closes #2609 from jiang13021/spark_stage_attempt_id.

Lead-authored-by: jiang13021 <jiangyanze.jyz@antgroup.com>
Co-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
(cherry picked from commit 3853075)
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
cfmcgrady added a commit to cfmcgrady/incubator-celeborn that referenced this pull request Aug 30, 2024
…temptId

Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId.

If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue.

No

Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit

Closes apache#2609 from jiang13021/spark_stage_attempt_id.

Lead-authored-by: jiang13021 <jiangyanze.jyz@antgroup.com>
Co-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
zaynt4606 pushed a commit to zaynt4606/celeborn that referenced this pull request Sep 2, 2024
…temptId

### What changes were proposed in this pull request?
Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId.

### Why are the changes needed?
If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit

Closes apache#2609 from jiang13021/spark_stage_attempt_id.

Lead-authored-by: jiang13021 <jiangyanze.jyz@antgroup.com>
Co-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
cfmcgrady pushed a commit that referenced this pull request Sep 2, 2024
…ageAttemptId

backport #2609 to branch-0.4

### What changes were proposed in this pull request?

Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId.

### Why are the changes needed?

If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit

Closes #2609 from jiang13021/spark_stage_attempt_id.

Lead-authored-by: jiang13021 <jiangyanze.jyzantgroup.com>

Closes #2717 from cfmcgrady/CELEBORN-1496-branch-0.4.

Authored-by: jiang13021 <jiangyanze.jyz@antgroup.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
kerwin-zk pushed a commit that referenced this pull request Sep 5, 2024
### What changes were proposed in this pull request?

Introduce `spark-3.5-columnar-shuffle` module to support columnar shuffle for Spark 3.5.

Follow up #2710, #2609.

### Why are the changes needed?

Tests of `CelebornColumnarShuffleReaderSuite` are failed for the changes of #2609.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`CelebornColumnarShuffleReaderSuite`

Closes #2726 from SteNicholas/CELEBORN-912.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
s0nskar pushed a commit to s0nskar/celeborn that referenced this pull request Sep 16, 2024
…temptId

### What changes were proposed in this pull request?
Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId.

### Why are the changes needed?
If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit

Closes apache#2609 from jiang13021/spark_stage_attempt_id.

Lead-authored-by: jiang13021 <jiangyanze.jyz@antgroup.com>
Co-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
s0nskar pushed a commit to s0nskar/celeborn that referenced this pull request Sep 16, 2024
### What changes were proposed in this pull request?

Introduce `spark-3.5-columnar-shuffle` module to support columnar shuffle for Spark 3.5.

Follow up apache#2710, apache#2609.

### Why are the changes needed?

Tests of `CelebornColumnarShuffleReaderSuite` are failed for the changes of apache#2609.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`CelebornColumnarShuffleReaderSuite`

Closes apache#2726 from SteNicholas/CELEBORN-912.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
wankunde pushed a commit to wankunde/celeborn that referenced this pull request Oct 11, 2024
### What changes were proposed in this pull request?

Adds support for barrier stages.
This involves two aspects:
a) If there is a task failure when executing a barrier stage, all shuffle output for the stage attempt are discarded and ignored.
b) If there is a reexecution of a barrier stage (for ex, due to child stage getting a fetch failure), all shuffle output for the previous stage attempt are discarded and ignored.

This is similar to handling of indeterminate stages when `throwsFetchFailure` is `true`.

Note that this is supported only when `spark.celeborn.client.spark.fetch.throwsFetchFailure` is `true`

### Why are the changes needed?

As detailed in CELEBORN-1518, Celeborn currently does not support barrier stages; which is an essential functionality in Apache Spark which is widely in use by Spark users.
Enhancing Celeborn will allow its use for a wider set of Spark users.

### Does this PR introduce _any_ user-facing change?

Adds ability for Celeborn to support Apache Spark Barrier stages.

### How was this patch tested?

Existing tests, and additional tests (thanks to jiang13021 in apache#2609 - [see here](https://github.com/apache/celeborn/pull/2609/files#diff-e17f15fcca26ddfc412f0af159c784d72417b0f22598e1b1ebfcacd6d4c3ad35))

Closes apache#2639 from mridulm/fix-barrier-stage-reexecution.

Lead-authored-by: Mridul Muralidharan <mridul@gmail.com>
Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
wankunde pushed a commit to wankunde/celeborn that referenced this pull request Oct 11, 2024
…temptId

### What changes were proposed in this pull request?
Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId.

### Why are the changes needed?
If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit

Closes apache#2609 from jiang13021/spark_stage_attempt_id.

Lead-authored-by: jiang13021 <jiangyanze.jyz@antgroup.com>
Co-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
wankunde pushed a commit to wankunde/celeborn that referenced this pull request Oct 11, 2024
### What changes were proposed in this pull request?

Introduce `spark-3.5-columnar-shuffle` module to support columnar shuffle for Spark 3.5.

Follow up apache#2710, apache#2609.

### Why are the changes needed?

Tests of `CelebornColumnarShuffleReaderSuite` are failed for the changes of apache#2609.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`CelebornColumnarShuffleReaderSuite`

Closes apache#2726 from SteNicholas/CELEBORN-912.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants