Skip to content

Commit 05372d1

Browse files
HeartSaVioRMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-26489][CORE] Use ConfigEntry for hardcoded configs for python/r categories
## What changes were proposed in this pull request? The PR makes hardcoded configs below to use ConfigEntry. * spark.pyspark * spark.python * spark.r This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties, python source code) ## How was this patch tested? Existing tests. Closes #23428 from HeartSaVioR/SPARK-26489. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent e2dbafd commit 05372d1

File tree

13 files changed

+96
-40
lines changed

13 files changed

+96
-40
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.collection.JavaConverters._
2727

2828
import org.apache.spark._
2929
import org.apache.spark.internal.Logging
30-
import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY
30+
import org.apache.spark.internal.config.Python._
3131
import org.apache.spark.security.SocketAuthHelper
3232
import org.apache.spark.util._
3333

@@ -71,7 +71,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
7171

7272
private val conf = SparkEnv.get.conf
7373
private val bufferSize = conf.getInt("spark.buffer.size", 65536)
74-
private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
74+
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
7575
// each python worker gets an equal part of the allocation. the worker pool will grow to the
7676
// number of concurrent tasks, which is determined by the number of cores in this executor.
7777
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
@@ -496,7 +496,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
496496
extends Thread(s"Worker Monitor for $pythonExec") {
497497

498498
/** How long to wait before killing the python worker if a task cannot be interrupted. */
499-
private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s")
499+
private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT)
500500

501501
setDaemon(true)
502502

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import scala.collection.mutable
2828

2929
import org.apache.spark._
3030
import org.apache.spark.internal.Logging
31+
import org.apache.spark.internal.config.Python._
3132
import org.apache.spark.security.SocketAuthHelper
3233
import org.apache.spark.util.{RedirectThread, Utils}
3334

@@ -41,7 +42,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
4142
// currently only works on UNIX-based systems now because it uses signals for child management,
4243
// so we can also fall back to launching workers, pyspark/worker.py (by default) directly.
4344
private val useDaemon = {
44-
val useDaemonEnabled = SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)
45+
val useDaemonEnabled = SparkEnv.get.conf.get(PYTHON_USE_DAEMON)
4546

4647
// This flag is ignored on Windows as it's unable to fork.
4748
!System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled
@@ -53,21 +54,21 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
5354

5455
// This configuration indicates the module to run the daemon to execute its Python workers.
5556
private val daemonModule =
56-
SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value =>
57+
SparkEnv.get.conf.get(PYTHON_DAEMON_MODULE).map { value =>
5758
logInfo(
58-
s"Python daemon module in PySpark is set to [$value] in 'spark.python.daemon.module', " +
59+
s"Python daemon module in PySpark is set to [$value] in '${PYTHON_DAEMON_MODULE.key}', " +
5960
"using this to start the daemon up. Note that this configuration only has an effect when " +
60-
"'spark.python.use.daemon' is enabled and the platform is not Windows.")
61+
s"'${PYTHON_USE_DAEMON.key}' is enabled and the platform is not Windows.")
6162
value
6263
}.getOrElse("pyspark.daemon")
6364

6465
// This configuration indicates the module to run each Python worker.
6566
private val workerModule =
66-
SparkEnv.get.conf.getOption("spark.python.worker.module").map { value =>
67+
SparkEnv.get.conf.get(PYTHON_WORKER_MODULE).map { value =>
6768
logInfo(
68-
s"Python worker module in PySpark is set to [$value] in 'spark.python.worker.module', " +
69+
s"Python worker module in PySpark is set to [$value] in '${PYTHON_WORKER_MODULE.key}', " +
6970
"using this to start the worker up. Note that this configuration only has an effect when " +
70-
"'spark.python.use.daemon' is disabled or the platform is Windows.")
71+
s"'${PYTHON_USE_DAEMON.key}' is disabled or the platform is Windows.")
7172
value
7273
}.getOrElse("pyspark.worker")
7374

core/src/main/scala/org/apache/spark/api/r/RBackend.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import io.netty.handler.timeout.ReadTimeoutHandler
3232

3333
import org.apache.spark.SparkConf
3434
import org.apache.spark.internal.Logging
35+
import org.apache.spark.internal.config.R._
3536

3637
/**
3738
* Netty-based backend server that is used to communicate between R and Java.
@@ -47,10 +48,8 @@ private[spark] class RBackend {
4748

4849
def init(): (Int, RAuthHelper) = {
4950
val conf = new SparkConf()
50-
val backendConnectionTimeout = conf.getInt(
51-
"spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
52-
bossGroup = new NioEventLoopGroup(
53-
conf.getInt("spark.r.numRBackendThreads", SparkRDefaults.DEFAULT_NUM_RBACKEND_THREADS))
51+
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
52+
bossGroup = new NioEventLoopGroup(conf.get(R_NUM_BACKEND_THREADS))
5453
val workerGroup = bossGroup
5554
val handler = new RBackendHandler(this)
5655
val authHelper = new RAuthHelper(conf)
@@ -126,8 +125,7 @@ private[spark] object RBackend extends Logging {
126125
// Connection timeout is set by socket client. To make it configurable we will pass the
127126
// timeout value to client inside the temp file
128127
val conf = new SparkConf()
129-
val backendConnectionTimeout = conf.getInt(
130-
"spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
128+
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
131129

132130
// tell the R process via temporary file
133131
val path = args(0)

core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import io.netty.handler.timeout.ReadTimeoutException
2929
import org.apache.spark.SparkConf
3030
import org.apache.spark.api.r.SerDe._
3131
import org.apache.spark.internal.Logging
32+
import org.apache.spark.internal.config.R._
3233
import org.apache.spark.util.{ThreadUtils, Utils}
3334

3435
/**
@@ -98,10 +99,8 @@ private[r] class RBackendHandler(server: RBackend)
9899
}
99100
}
100101
val conf = new SparkConf()
101-
val heartBeatInterval = conf.getInt(
102-
"spark.r.heartBeatInterval", SparkRDefaults.DEFAULT_HEARTBEAT_INTERVAL)
103-
val backendConnectionTimeout = conf.getInt(
104-
"spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
102+
val heartBeatInterval = conf.get(R_HEARTBEAT_INTERVAL)
103+
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
105104
val interval = Math.min(heartBeatInterval, backendConnectionTimeout - 1)
106105

107106
execService.scheduleAtFixedRate(pingRunner, interval, interval, TimeUnit.SECONDS)

core/src/main/scala/org/apache/spark/api/r/RRunner.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.util.Try
2727
import org.apache.spark._
2828
import org.apache.spark.broadcast.Broadcast
2929
import org.apache.spark.internal.Logging
30+
import org.apache.spark.internal.config.R._
3031
import org.apache.spark.util.Utils
3132

3233
/**
@@ -340,11 +341,10 @@ private[r] object RRunner {
340341
// "spark.sparkr.r.command" is deprecated and replaced by "spark.r.command",
341342
// but kept here for backward compatibility.
342343
val sparkConf = SparkEnv.get.conf
343-
var rCommand = sparkConf.get("spark.sparkr.r.command", "Rscript")
344-
rCommand = sparkConf.get("spark.r.command", rCommand)
344+
var rCommand = sparkConf.get(SPARKR_COMMAND)
345+
rCommand = sparkConf.get(R_COMMAND).orElse(Some(rCommand)).get
345346

346-
val rConnectionTimeout = sparkConf.getInt(
347-
"spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
347+
val rConnectionTimeout = sparkConf.get(R_BACKEND_CONNECTION_TIMEOUT)
348348
val rOptions = "--vanilla"
349349
val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
350350
val rExecScript = rLibDir(0) + "/SparkR/worker/" + script

core/src/main/scala/org/apache/spark/deploy/RRunner.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import scala.collection.JavaConverters._
2525
import org.apache.hadoop.fs.Path
2626

2727
import org.apache.spark.{SparkException, SparkUserAppException}
28-
import org.apache.spark.api.r.{RBackend, RUtils, SparkRDefaults}
28+
import org.apache.spark.api.r.{RBackend, RUtils}
29+
import org.apache.spark.internal.config.R._
2930
import org.apache.spark.util.RedirectThread
3031

3132
/**
@@ -43,8 +44,8 @@ object RRunner {
4344
val rCommand = {
4445
// "spark.sparkr.r.command" is deprecated and replaced by "spark.r.command",
4546
// but kept here for backward compatibility.
46-
var cmd = sys.props.getOrElse("spark.sparkr.r.command", "Rscript")
47-
cmd = sys.props.getOrElse("spark.r.command", cmd)
47+
var cmd = sys.props.getOrElse(SPARKR_COMMAND.key, SPARKR_COMMAND.defaultValue.get)
48+
cmd = sys.props.getOrElse(R_COMMAND.key, cmd)
4849
if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") {
4950
cmd = sys.props.getOrElse("spark.r.driver.command", cmd)
5051
}
@@ -53,7 +54,7 @@ object RRunner {
5354

5455
// Connection timeout set by R process on its connection to RBackend in seconds.
5556
val backendConnectionTimeout = sys.props.getOrElse(
56-
"spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT.toString)
57+
R_BACKEND_CONNECTION_TIMEOUT.key, R_BACKEND_CONNECTION_TIMEOUT.defaultValue.get.toString)
5758

5859
// Check if the file path exists.
5960
// If not, change directory to current working directory for YARN cluster mode
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.internal.config
18+
19+
import java.util.concurrent.TimeUnit
20+
21+
import org.apache.spark.network.util.ByteUnit
22+
23+
private[spark] object Python {
24+
val PYTHON_WORKER_REUSE = ConfigBuilder("spark.python.worker.reuse")
25+
.booleanConf
26+
.createWithDefault(true)
27+
28+
val PYTHON_TASK_KILL_TIMEOUT = ConfigBuilder("spark.python.task.killTimeout")
29+
.timeConf(TimeUnit.MILLISECONDS)
30+
.createWithDefaultString("2s")
31+
32+
val PYTHON_USE_DAEMON = ConfigBuilder("spark.python.use.daemon")
33+
.booleanConf
34+
.createWithDefault(true)
35+
36+
val PYTHON_DAEMON_MODULE = ConfigBuilder("spark.python.daemon.module")
37+
.stringConf
38+
.createOptional
39+
40+
val PYTHON_WORKER_MODULE = ConfigBuilder("spark.python.worker.module")
41+
.stringConf
42+
.createOptional
43+
44+
val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory")
45+
.bytesConf(ByteUnit.MiB)
46+
.createOptional
47+
}

core/src/main/scala/org/apache/spark/api/r/SparkRDefaults.scala renamed to core/src/main/scala/org/apache/spark/internal/config/R.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,27 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
package org.apache.spark.internal.config
1718

18-
package org.apache.spark.api.r
19+
private[spark] object R {
1920

20-
private[spark] object SparkRDefaults {
21+
val R_BACKEND_CONNECTION_TIMEOUT = ConfigBuilder("spark.r.backendConnectionTimeout")
22+
.intConf
23+
.createWithDefault(6000)
2124

22-
// Default value for spark.r.backendConnectionTimeout config
23-
val DEFAULT_CONNECTION_TIMEOUT: Int = 6000
25+
val R_NUM_BACKEND_THREADS = ConfigBuilder("spark.r.numRBackendThreads")
26+
.intConf
27+
.createWithDefault(2)
2428

25-
// Default value for spark.r.heartBeatInterval config
26-
val DEFAULT_HEARTBEAT_INTERVAL: Int = 100
29+
val R_HEARTBEAT_INTERVAL = ConfigBuilder("spark.r.heartBeatInterval")
30+
.intConf
31+
.createWithDefault(100)
2732

28-
// Default value for spark.r.numRBackendThreads config
29-
val DEFAULT_NUM_RBACKEND_THREADS = 2
33+
val SPARKR_COMMAND = ConfigBuilder("spark.sparkr.r.command")
34+
.stringConf
35+
.createWithDefault("Rscript")
36+
37+
val R_COMMAND = ConfigBuilder("spark.r.command")
38+
.stringConf
39+
.createOptional
3040
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,6 @@ package object config {
166166
.checkValue(_ >= 0, "The off-heap memory size must not be negative")
167167
.createWithDefault(0)
168168

169-
private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory")
170-
.bytesConf(ByteUnit.MiB)
171-
.createOptional
172-
173169
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
174170
.booleanConf.createWithDefault(false)
175171

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.deploy.k8s._
2525
import org.apache.spark.deploy.k8s.Config._
2626
import org.apache.spark.deploy.k8s.Constants._
2727
import org.apache.spark.internal.config._
28+
import org.apache.spark.internal.config.Python._
2829
import org.apache.spark.rpc.RpcEndpointAddress
2930
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3031
import org.apache.spark.util.Utils

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf,
3030
import org.apache.spark.deploy.k8s.Config._
3131
import org.apache.spark.deploy.k8s.Constants._
3232
import org.apache.spark.internal.config._
33+
import org.apache.spark.internal.config.Python._
3334
import org.apache.spark.rpc.RpcEndpointAddress
3435
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3536
import org.apache.spark.util.Utils

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import org.apache.spark.deploy.yarn.config._
5353
import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
5454
import org.apache.spark.internal.Logging
5555
import org.apache.spark.internal.config._
56+
import org.apache.spark.internal.config.Python._
5657
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
5758
import org.apache.spark.util.{CallerContext, Utils}
5859

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
3636
import org.apache.spark.deploy.yarn.config._
3737
import org.apache.spark.internal.Logging
3838
import org.apache.spark.internal.config._
39+
import org.apache.spark.internal.config.Python._
3940
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
4041
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
4142
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor

0 commit comments

Comments
 (0)