-
Notifications
You must be signed in to change notification settings - Fork 370
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[CELEBORN-1496][0.4] Differentiate map results with only different st…
…ageAttemptId backport #2609 to branch-0.4 ### What changes were proposed in this pull request? Let attemptNumber = (stageAttemptId << 16) | taskAttemptNumber, to differentiate map results with only different stageAttemptId. ### Why are the changes needed? If we can't differentiate map tasks with only different stageAttemptId, it may lead to mixed reading of two map tasks' shuffle write batches during shuffle read, causing data correctness issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add ut: org.apache.spark.shuffle.celeborn.SparkShuffleManagerSuite#testWrongSparkConf_MaxAttemptLimit Closes #2609 from jiang13021/spark_stage_attempt_id. Lead-authored-by: jiang13021 <jiangyanze.jyzantgroup.com> Closes #2717 from cfmcgrady/CELEBORN-1496-branch-0.4. Authored-by: jiang13021 <jiangyanze.jyz@antgroup.com> Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
- Loading branch information
1 parent
706dda7
commit f776035
Showing
12 changed files
with
160 additions
and
29 deletions.
There are no files selected for viewing
53 changes: 53 additions & 0 deletions
53
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle.celeborn; | ||
|
||
import org.apache.spark.SparkConf; | ||
import org.apache.spark.TaskContext; | ||
import org.apache.spark.scheduler.DAGScheduler; | ||
|
||
public class SparkCommonUtils { | ||
public static void validateAttemptConfig(SparkConf conf) throws IllegalArgumentException { | ||
int maxStageAttempts = | ||
conf.getInt( | ||
"spark.stage.maxConsecutiveAttempts", | ||
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS()); | ||
// In Spark 2, the parameter is referred to as MAX_TASK_FAILURES, while in Spark 3, it has been | ||
// changed to TASK_MAX_FAILURES. The default value for both is consistently set to 4. | ||
int maxTaskAttempts = conf.getInt("spark.task.maxFailures", 4); | ||
if (maxStageAttempts >= (1 << 15) || maxTaskAttempts >= (1 << 16)) { | ||
// The map attemptId is a non-negative number constructed from | ||
// both stageAttemptNumber and taskAttemptNumber. | ||
// The high 16 bits of the map attemptId are used for the stageAttemptNumber, | ||
// and the low 16 bits are used for the taskAttemptNumber. | ||
// So spark.stage.maxConsecutiveAttempts should be less than 32768 (1 << 15) | ||
// and spark.task.maxFailures should be less than 65536 (1 << 16). | ||
throw new IllegalArgumentException( | ||
"The spark.stage.maxConsecutiveAttempts should be less than 32768 (currently " | ||
+ maxStageAttempts | ||
+ ")" | ||
+ "and spark.task.maxFailures should be less than 65536 (currently " | ||
+ maxTaskAttempts | ||
+ ")."); | ||
} | ||
} | ||
|
||
public static int getEncodedAttemptNumber(TaskContext context) { | ||
return (context.stageAttemptNumber() << 16) | context.attemptNumber(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.