Skip to content

[SPARK-2313] PySpark pass port rather than stdin #3424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,16 @@ object SparkSubmit {
if (args.isPython) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")

var gateway_port = 0;
// extract gateway_port from childArgs
for (i <- 1 until args.childArgs.length){
if (args.childArgs(i-1) == "--gateway_port"){
gateway_port = args.childArgs(i).toInt
}
}

args.childArgs = ArrayBuffer("--die-on-broken-pipe", gateway_port.toString)
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
Expand Down
38 changes: 35 additions & 3 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,31 @@
import sys
import signal
import shlex
import socket
import platform
from subprocess import Popen, PIPE
from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient

def peek_free_port():
"""
Check and return an available port number for binding.
The port number is generated with a hint from PID to avoid possible concurrent issues.

:return: available port or 0 if no ports available
"""

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
port_start = 10000
port = os.getpid() % (0xFFFF - port_start) + port_start # ensure port range in 10000(port_start) ~ 65535
for offset in range(128): # 128 attempts max
try:
s.bind(("0.0.0.0", port + offset))
except socket.error:
continue
s.close()
return port + offset
return 0 # maximum number of attempts reached, let's leave it to Py4j

def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]
Expand All @@ -41,6 +61,11 @@ def launch_gateway():
submit_args = submit_args if submit_args is not None else ""
submit_args = shlex.split(submit_args)
command = [os.path.join(SPARK_HOME, script)] + submit_args + ["pyspark-shell"]

gateway_port_candidate = peek_free_port()
if gateway_port_candidate>0:
command += ["--gateway_port", "%d" % gateway_port_candidate]

if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
Expand All @@ -51,11 +76,13 @@ def preexec_func():
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdout=PIPE, stdin=PIPE)

try:
# Determine which ephemeral port the server started on:
# Determine which ephemeral port the server started on,
# or double check if gateway_port_candidate is passed in:
gateway_port = proc.stdout.readline()
gateway_port = int(gateway_port)
if gateway_port_candidate > 0 and gateway_port != gateway_port_candidate:
print "Warning, gateway_port_candidate != gateway_port, possible concurrent issues: %r != %r" % (gateway_port_candidate, gateway_port)
except ValueError:
# Grab the remaining lines of stdout
(stdout, _) = proc.communicate()
Expand All @@ -70,7 +97,12 @@ def preexec_func():
error_msg += "--------------------------------------------------------------\n"
error_msg += gateway_port + stdout
error_msg += "--------------------------------------------------------------\n"
raise Exception(error_msg)
if gateway_port_candidate > 0:
print "Warning, parse gateway_port failed, Caused by:\n" + error_msg
print "However, we have a chance to assume gateway_port_candidate is used. Suppress the exception and give it a try..."
gateway_port = gateway_port_candidate
else:
raise Exception(error_msg)

# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
Expand Down