Skip to content

Commit

Permalink
Add SparkCommonUtils.validateAttemptConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
jiang13021 committed Aug 12, 2024
1 parent e0affd7 commit c0243d0
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

package org.apache.spark.shuffle.celeborn;

import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.scheduler.DAGScheduler;

public class SparkCommonUtils {
public static void validateMaxAttempts(int maxStageAttempts, int maxTaskAttempts)
throws IllegalArgumentException {
public static void validateAttemptConfig(SparkConf conf) throws IllegalArgumentException {
int maxStageAttempts =
conf.getInt(
"spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS());
int maxTaskAttempts = conf.getInt("spark.task.maxFailures", 4);
if (maxStageAttempts >= (1 << 15) || maxTaskAttempts >= (1 << 16)) {
// The map attemptId is a non-negative number constructed from
// both stageAttemptNumber and taskAttemptNumber.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.spark.internal.config.package$;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.rdd.DeterministicLevel;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.shuffle.*;
import org.apache.spark.shuffle.sort.SortShuffleManager;
import org.apache.spark.util.Utils;
Expand Down Expand Up @@ -67,12 +66,7 @@ public class SparkShuffleManager implements ShuffleManager {
private ExecutorShuffleIdTracker shuffleIdTracker = new ExecutorShuffleIdTracker();

public SparkShuffleManager(SparkConf conf, boolean isDriver) {
int maxStageAttempts =
conf.getInt(
"spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS());
int maxTaskAttempts = (Integer) conf.get(package$.MODULE$.MAX_TASK_FAILURES());
SparkCommonUtils.validateMaxAttempts(maxStageAttempts, maxTaskAttempts);
SparkCommonUtils.validateAttemptConfig(conf);
this.conf = conf;
this.isDriver = isDriver;
this.celebornConf = SparkUtils.fromSparkConf(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.spark.internal.config.package$;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.rdd.DeterministicLevel;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.shuffle.*;
import org.apache.spark.shuffle.sort.SortShuffleManager;
import org.apache.spark.sql.internal.SQLConf;
Expand Down Expand Up @@ -108,12 +107,7 @@ public SparkShuffleManager(SparkConf conf, boolean isDriver) {
key,
defaultValue);
}
int maxStageAttempts =
conf.getInt(
"spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS());
int maxTaskAttempts = (Integer) conf.get(package$.MODULE$.TASK_MAX_FAILURES());
SparkCommonUtils.validateMaxAttempts(maxStageAttempts, maxTaskAttempts);
SparkCommonUtils.validateAttemptConfig(conf);
this.conf = conf;
this.isDriver = isDriver;
this.celebornConf = SparkUtils.fromSparkConf(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.{BarrierTaskContext, ShuffleDependency, SparkConf, SparkContextHelper, SparkException, TaskContext}
import org.apache.spark.celeborn.ExceptionMakerHelper
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContextHelper, TaskContext}
import org.apache.spark.shuffle.ShuffleHandle
import org.apache.spark.shuffle.celeborn.{CelebornShuffleHandle, ShuffleManagerHook, SparkShuffleManager, SparkUtils, TestCelebornShuffleManager}
import org.apache.spark.sql.{Row, RowFactory, SparkSession}
Expand All @@ -36,7 +35,6 @@ import org.scalatest.funsuite.AnyFunSuite

import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornIOException
import org.apache.celeborn.common.protocol.ShuffleMode
import org.apache.celeborn.service.deploy.worker.Worker

Expand Down

0 comments on commit c0243d0

Please sign in to comment.