Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed May 31, 2024
1 parent 4360ec7 commit ea92bca
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var executorCores: String = null
var totalExecutorCores: String = null
var propertiesFile: String = null
private var extraPropertiesFile: Boolean = false
var driverMemory: String = null
var driverExtraClassPath: String = null
var driverExtraLibraryPath: String = null
Expand Down Expand Up @@ -87,27 +88,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S

override protected def logName: String = classOf[SparkSubmitArguments].getName

/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
if (verbose) {
logInfo(log"Using properties file: ${MDC(PATH, propertiesFile)}")
}
Option(propertiesFile).foreach { filename =>
val properties = Utils.getPropertiesFromFile(filename)
properties.foreach { case (k, v) =>
defaultProperties(k) = v
}
// Property files may contain sensitive information, so redact before printing
if (verbose) {
Utils.redact(properties).foreach { case (k, v) =>
logInfo(log"Adding default property: ${MDC(KEY, k)}=${MDC(VALUE, v)}")
}
}
}
defaultProperties
}

// Set parameters from command line arguments
parse(args.asJava)

Expand All @@ -123,31 +103,43 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
validateArguments()

/**
* Merge values from the default properties file with those specified through --conf.
* When this is called, `sparkProperties` is already filled with configs from the latter.
* Load properties from the file with the given path into `sparkProperties`.
* No-op if the file path is null
*/
private def mergeDefaultSparkProperties(): Unit = {
// Honor --conf before the specified properties file and defaults file
defaultSparkProperties.foreach { case (k, v) =>
if (!sparkProperties.contains(k)) {
sparkProperties(k) = v
private def loadPropertiesFromFile(filePath: String): Unit = {
if (filePath != null) {
if (verbose) {
logInfo(log"Using properties file: ${MDC(PATH, filePath)}")
}
}

// Also load properties from `spark-defaults.conf` if they do not exist in the properties file
// and --conf list
val defaultSparkConf = Utils.getDefaultPropertiesFile(env)
Option(defaultSparkConf).foreach { filename =>
val properties = Utils.getPropertiesFromFile(filename)
val properties = Utils.getPropertiesFromFile(filePath)
properties.foreach { case (k, v) =>
if (!sparkProperties.contains(k)) {
sparkProperties(k) = v
}
}
// Property files may contain sensitive information, so redact before printing
if (verbose) {
Utils.redact(properties).foreach { case (k, v) =>
logInfo(log"Adding default property: ${MDC(KEY, k)}=${MDC(VALUE, v)}")
}
}
}
}

if (propertiesFile == null) {
propertiesFile = defaultSparkConf
/**
* Merge values from the default properties file with those specified through --conf.
* When this is called, `sparkProperties` is already filled with configs from the latter.
*/
private def mergeDefaultSparkProperties(): Unit = {
// Honor --conf before the specified properties file and defaults file
loadPropertiesFromFile(propertiesFile)

// Also load properties from `spark-defaults.conf` if they do not exist in the properties file
// and --conf list when:
// - no input properties file is specified
// - input properties file is specified, but `--extra-properties-files` flag is set
if (propertiesFile == null || extraPropertiesFile) {
loadPropertiesFromFile(Utils.getDefaultPropertiesFile(env))
}
}

Expand Down Expand Up @@ -405,6 +397,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case PROPERTIES_FILE =>
propertiesFile = value

case EXTRA_PROPERTIES_FILES =>
extraPropertiesFile = true

case KILL_SUBMISSION =>
submissionToKill = value
if (action != null) {
Expand Down Expand Up @@ -548,6 +543,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --conf, -c PROP=VALUE Arbitrary Spark configuration property.
| --properties-file FILE Path to a file from which to load extra properties. If not
| specified, this will look for conf/spark-defaults.conf.
| --extra-properties-files Whether to load properties from conf/spark-defaults.conf,
| even if --properties-file is specified.
|
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: ${mem_mb}M).
| --driver-java-options Extra Java options to pass to the driver.
Expand Down
28 changes: 25 additions & 3 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1113,19 +1113,41 @@ class SparkSubmitSuite
}
}

test("SPARK-48392: Allow both spark-defaults.conf and properties file") {
forConfDir(Map("spark.executor.memory" -> "3g")) { path =>
withPropertyFile("spark-conf.properties", Map("spark.executor.cores" -> "16")) { propsFile =>
test("SPARK-48392: load spark-defaults.conf when --extra-properties-files is set") {
forConfDir(Map("spark.executor.memory" -> "3g", "spark.driver.memory" -> "3g")) { path =>
withPropertyFile("spark-conf.properties",
Map("spark.executor.cores" -> "16", "spark.driver.memory" -> "4g")) { propsFile =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local",
"--properties-file", propsFile,
"--extra-properties-files",
unusedJar.toString)
val appArgs = new SparkSubmitArguments(args, env = Map("SPARK_CONF_DIR" -> path))
appArgs.executorCores should be("16")
appArgs.executorMemory should be("3g")
appArgs.driverMemory should be("4g")
}
}
}

test("SPARK-48392: should skip spark-defaults.conf when --extra-properties-files is not set") {
forConfDir(Map("spark.executor.memory" -> "3g", "spark.driver.memory" -> "3g")) { path =>
withPropertyFile("spark-conf.properties",
Map("spark.executor.cores" -> "16", "spark.driver.memory" -> "4g")) { propsFile =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local",
"--properties-file", propsFile,
unusedJar.toString)
val appArgs = new SparkSubmitArguments(args, env = Map("SPARK_CONF_DIR" -> path))
appArgs.executorCores should be("16")
appArgs.driverMemory should be("4g")
appArgs.executorMemory should be(null)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class SparkSubmitOptionParser {
protected final String PACKAGES = "--packages";
protected final String PACKAGES_EXCLUDE = "--exclude-packages";
protected final String PROPERTIES_FILE = "--properties-file";
protected final String EXTRA_PROPERTIES_FILES = "--extra-properties-files";
protected final String PROXY_USER = "--proxy-user";
protected final String PY_FILES = "--py-files";
protected final String REPOSITORIES = "--repositories";
Expand Down Expand Up @@ -130,6 +131,7 @@ class SparkSubmitOptionParser {
{ USAGE_ERROR },
{ VERBOSE, "-v" },
{ VERSION },
{ EXTRA_PROPERTIES_FILES },
};

/**
Expand Down

0 comments on commit ea92bca

Please sign in to comment.