-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-22959][PYTHON] Configuration to select the modules for daemon and worker in PySpark #20151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
HyukjinKwon
wants to merge
3
commits into
apache:master
from
HyukjinKwon:configuration-daemon-worker
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,17 +34,39 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String | |
|
||
import PythonWorkerFactory._ | ||
|
||
// Because forking processes from Java is expensive, we prefer to launch a single Python daemon | ||
// (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently | ||
// only works on UNIX-based systems now because it uses signals for child management, so we can | ||
// also fall back to launching workers (pyspark/worker.py) directly. | ||
// Because forking processes from Java is expensive, we prefer to launch a single Python daemon, | ||
// pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon | ||
// currently only works on UNIX-based systems now because it uses signals for child management, | ||
// so we can also fall back to launching workers, pyspark/worker.py (by default) directly. | ||
val useDaemon = { | ||
val useDaemonEnabled = SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true) | ||
|
||
// This flag is ignored on Windows as it's unable to fork. | ||
!System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled | ||
} | ||
|
||
// WARN: Both configurations, 'spark.python.daemon.module' and 'spark.python.worker.module' are | ||
// for very advanced users and they are experimental. This should be considered | ||
// as expert-only option, and shouldn't be used before knowing what it means exactly. | ||
|
||
// This configuration indicates the module to run the daemon to execute its Python workers. | ||
val daemonModule = SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value => | ||
logInfo( | ||
s"Python daemon module in PySpark is set to [$value] in 'spark.python.daemon.module', " + | ||
"using this to start the daemon up. Note that this configuration only has an effect when " + | ||
"'spark.python.use.daemon' is enabled and the platform is not Windows.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just double checked it shows the log only when the configuration is explicitly set:
|
||
value | ||
}.getOrElse("pyspark.daemon") | ||
|
||
// This configuration indicates the module to run each Python worker. | ||
val workerModule = SparkEnv.get.conf.getOption("spark.python.worker.module").map { value => | ||
logInfo( | ||
s"Python worker module in PySpark is set to [$value] in 'spark.python.worker.module', " + | ||
"using this to start the worker up. Note that this configuration only has an effect when " + | ||
"'spark.python.use.daemon' is disabled or the platform is Windows.") | ||
value | ||
}.getOrElse("pyspark.worker") | ||
|
||
var daemon: Process = null | ||
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1)) | ||
var daemonPort: Int = 0 | ||
|
@@ -74,8 +96,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String | |
} | ||
|
||
/** | ||
* Connect to a worker launched through pyspark/daemon.py, which forks python processes itself | ||
* to avoid the high cost of forking from Java. This currently only works on UNIX-based systems. | ||
* Connect to a worker launched through pyspark/daemon.py (by default), which forks python | ||
* processes itself to avoid the high cost of forking from Java. This currently only works | ||
* on UNIX-based systems. | ||
*/ | ||
private def createThroughDaemon(): Socket = { | ||
|
||
|
@@ -108,15 +131,15 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String | |
} | ||
|
||
/** | ||
* Launch a worker by executing worker.py directly and telling it to connect to us. | ||
* Launch a worker by executing worker.py (by default) directly and telling it to connect to us. | ||
*/ | ||
private def createSimpleWorker(): Socket = { | ||
var serverSocket: ServerSocket = null | ||
try { | ||
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) | ||
|
||
// Create and start the worker | ||
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.worker")) | ||
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule)) | ||
val workerEnv = pb.environment() | ||
workerEnv.putAll(envVars.asJava) | ||
workerEnv.put("PYTHONPATH", pythonPath) | ||
|
@@ -159,7 +182,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String | |
|
||
try { | ||
// Create and start the daemon | ||
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.daemon")) | ||
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule)) | ||
val workerEnv = pb.environment() | ||
workerEnv.putAll(envVars.asJava) | ||
workerEnv.put("PYTHONPATH", pythonPath) | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to restrict the module's package to only allow something like
pyspark.*
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, actually we could check like .. if it's empty string too. I wrote "shouldn't be used before knowing what it means exactly." above. So, I think it's fine.