Skip to content

[SPARK-22959][PYTHON] Configuration to select the modules for daemon and worker in PySpark #20151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,39 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

import PythonWorkerFactory._

// Because forking processes from Java is expensive, we prefer to launch a single Python daemon
// (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently
// only works on UNIX-based systems now because it uses signals for child management, so we can
// also fall back to launching workers (pyspark/worker.py) directly.
// Because forking processes from Java is expensive, we prefer to launch a single Python daemon,
// pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon
// currently only works on UNIX-based systems now because it uses signals for child management,
// so we can also fall back to launching workers, pyspark/worker.py (by default) directly.
val useDaemon = {
val useDaemonEnabled = SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)

// This flag is ignored on Windows as it's unable to fork.
!System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled
}

// WARN: Both configurations, 'spark.python.daemon.module' and 'spark.python.worker.module' are
// for very advanced users and they are experimental. This should be considered
// as expert-only option, and shouldn't be used before knowing what it means exactly.

// This configuration indicates the module to run the daemon to execute its Python workers.
val daemonModule = SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to restrict the module's package to only allow something like pyspark.*?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, actually we could check like .. if it's empty string too. I wrote "shouldn't be used before knowing what it means exactly." above. So, I think it's fine.

logInfo(
s"Python daemon module in PySpark is set to [$value] in 'spark.python.daemon.module', " +
"using this to start the daemon up. Note that this configuration only has an effect when " +
"'spark.python.use.daemon' is enabled and the platform is not Windows.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checked it shows the log only when the configuration is explicitly set:

18/01/10 21:23:24 INFO PythonWorkerFactory: Python daemon module in PySpark is set to [pyspark.daemon] in
 'spark.python.daemon.module', using this to start the daemon up. Note that this configuration only has an 
effect when 'spark.python.use.daemon' is enabled and the platform is not Windows.

value
}.getOrElse("pyspark.daemon")

// This configuration indicates the module to run each Python worker.
val workerModule = SparkEnv.get.conf.getOption("spark.python.worker.module").map { value =>
logInfo(
s"Python worker module in PySpark is set to [$value] in 'spark.python.worker.module', " +
"using this to start the worker up. Note that this configuration only has an effect when " +
"'spark.python.use.daemon' is disabled or the platform is Windows.")
value
}.getOrElse("pyspark.worker")

var daemon: Process = null
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
var daemonPort: Int = 0
Expand Down Expand Up @@ -74,8 +96,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}

/**
* Connect to a worker launched through pyspark/daemon.py, which forks python processes itself
* to avoid the high cost of forking from Java. This currently only works on UNIX-based systems.
* Connect to a worker launched through pyspark/daemon.py (by default), which forks python
* processes itself to avoid the high cost of forking from Java. This currently only works
* on UNIX-based systems.
*/
private def createThroughDaemon(): Socket = {

Expand Down Expand Up @@ -108,15 +131,15 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}

/**
* Launch a worker by executing worker.py directly and telling it to connect to us.
* Launch a worker by executing worker.py (by default) directly and telling it to connect to us.
*/
private def createSimpleWorker(): Socket = {
var serverSocket: ServerSocket = null
try {
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.worker"))
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
Expand Down Expand Up @@ -159,7 +182,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.daemon"))
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
Expand Down