Skip to content

Commit 6f7edf8

Browse files
committed
Merge remote-tracking branch 'upstream/master' into dt-python-consistency
Conflicts: python/pyspark/mllib/tree.py (not a real conflict, merged)
2 parents a0d7dbe + e537b33 commit 6f7edf8

File tree

80 files changed

+1225
-491
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+1225
-491
lines changed

bin/beeline

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,14 @@
1717
# limitations under the License.
1818
#
1919

20-
# Figure out where Spark is installed
21-
FWDIR="$(cd `dirname $0`/..; pwd)"
20+
#
21+
# Shell script for starting BeeLine
2222

23-
# Find the java binary
24-
if [ -n "${JAVA_HOME}" ]; then
25-
RUNNER="${JAVA_HOME}/bin/java"
26-
else
27-
if [ `command -v java` ]; then
28-
RUNNER="java"
29-
else
30-
echo "JAVA_HOME is not set" >&2
31-
exit 1
32-
fi
33-
fi
23+
# Enter posix mode for bash
24+
set -o posix
3425

35-
# Compute classpath using external script
36-
classpath_output=$($FWDIR/bin/compute-classpath.sh)
37-
if [[ "$?" != "0" ]]; then
38-
echo "$classpath_output"
39-
exit 1
40-
else
41-
CLASSPATH=$classpath_output
42-
fi
26+
# Figure out where Spark is installed
27+
FWDIR="$(cd `dirname $0`/..; pwd)"
4328

4429
CLASS="org.apache.hive.beeline.BeeLine"
45-
exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"
30+
exec "$FWDIR/bin/spark-class" $CLASS "$@"

bin/spark-sql

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,72 @@
2323
# Enter posix mode for bash
2424
set -o posix
2525

26+
CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
27+
2628
# Figure out where Spark is installed
2729
FWDIR="$(cd `dirname $0`/..; pwd)"
2830

29-
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
30-
echo "Usage: ./sbin/spark-sql [options]"
31+
function usage {
32+
echo "Usage: ./sbin/spark-sql [options] [cli option]"
33+
pattern="usage"
34+
pattern+="\|Spark assembly has been built with Hive"
35+
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
36+
pattern+="\|Spark Command: "
37+
pattern+="\|--help"
38+
pattern+="\|======="
39+
3140
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
41+
echo
42+
echo "CLI options:"
43+
$FWDIR/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
44+
}
45+
46+
function ensure_arg_number {
47+
arg_number=$1
48+
at_least=$2
49+
50+
if [[ $arg_number -lt $at_least ]]; then
51+
usage
52+
exit 1
53+
fi
54+
}
55+
56+
if [[ "$@" = --help ]] || [[ "$@" = -h ]]; then
57+
usage
3258
exit 0
3359
fi
3460

35-
CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
36-
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
61+
CLI_ARGS=()
62+
SUBMISSION_ARGS=()
63+
64+
while (($#)); do
65+
case $1 in
66+
-d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
67+
ensure_arg_number $# 2
68+
CLI_ARGS+=($1); shift
69+
CLI_ARGS+=($1); shift
70+
;;
71+
72+
-e)
73+
ensure_arg_number $# 2
74+
CLI_ARGS+=($1); shift
75+
CLI_ARGS+=(\"$1\"); shift
76+
;;
77+
78+
-s | --silent)
79+
CLI_ARGS+=($1); shift
80+
;;
81+
82+
-v | --verbose)
83+
# Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
84+
CLI_ARGS+=($1)
85+
SUBMISSION_ARGS+=($1); shift
86+
;;
87+
88+
*)
89+
SUBMISSION_ARGS+=($1); shift
90+
;;
91+
esac
92+
done
93+
94+
eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import com.google.common.io.Files
2323

2424
import org.apache.spark.util.Utils
2525

26-
private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
26+
private[spark] class HttpFileServer(
27+
securityManager: SecurityManager,
28+
requestedPort: Int = 0)
29+
extends Logging {
2730

2831
var baseDir : File = null
2932
var fileDir : File = null
@@ -38,7 +41,7 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
3841
fileDir.mkdir()
3942
jarDir.mkdir()
4043
logInfo("HTTP File server directory is " + baseDir)
41-
httpServer = new HttpServer(baseDir, securityManager)
44+
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
4245
httpServer.start()
4346
serverUri = httpServer.uri
4447
logDebug("HTTP file server started at: " + serverUri)

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121

2222
import org.eclipse.jetty.util.security.{Constraint, Password}
2323
import org.eclipse.jetty.security.authentication.DigestAuthenticator
24-
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
24+
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
2525

2626
import org.eclipse.jetty.server.Server
2727
import org.eclipse.jetty.server.bio.SocketConnector
@@ -41,48 +41,68 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
4141
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
4242
* around a Jetty server.
4343
*/
44-
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
45-
extends Logging {
44+
private[spark] class HttpServer(
45+
resourceBase: File,
46+
securityManager: SecurityManager,
47+
requestedPort: Int = 0,
48+
serverName: String = "HTTP server")
49+
extends Logging {
50+
4651
private var server: Server = null
47-
private var port: Int = -1
52+
private var port: Int = requestedPort
4853

4954
def start() {
5055
if (server != null) {
5156
throw new ServerStateException("Server is already started")
5257
} else {
5358
logInfo("Starting HTTP Server")
54-
server = new Server()
55-
val connector = new SocketConnector
56-
connector.setMaxIdleTime(60*1000)
57-
connector.setSoLingerTime(-1)
58-
connector.setPort(0)
59-
server.addConnector(connector)
60-
61-
val threadPool = new QueuedThreadPool
62-
threadPool.setDaemon(true)
63-
server.setThreadPool(threadPool)
64-
val resHandler = new ResourceHandler
65-
resHandler.setResourceBase(resourceBase.getAbsolutePath)
66-
67-
val handlerList = new HandlerList
68-
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
69-
70-
if (securityManager.isAuthenticationEnabled()) {
71-
logDebug("HttpServer is using security")
72-
val sh = setupSecurityHandler(securityManager)
73-
// make sure we go through security handler to get resources
74-
sh.setHandler(handlerList)
75-
server.setHandler(sh)
76-
} else {
77-
logDebug("HttpServer is not using security")
78-
server.setHandler(handlerList)
79-
}
80-
81-
server.start()
82-
port = server.getConnectors()(0).getLocalPort()
59+
val (actualServer, actualPort) =
60+
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
61+
server = actualServer
62+
port = actualPort
8363
}
8464
}
8565

66+
/**
67+
* Actually start the HTTP server on the given port.
68+
*
69+
* Note that this is only best effort in the sense that we may end up binding to a nearby port
70+
* in the event of port collision. Return the bound server and the actual port used.
71+
*/
72+
private def doStart(startPort: Int): (Server, Int) = {
73+
val server = new Server()
74+
val connector = new SocketConnector
75+
connector.setMaxIdleTime(60 * 1000)
76+
connector.setSoLingerTime(-1)
77+
connector.setPort(startPort)
78+
server.addConnector(connector)
79+
80+
val threadPool = new QueuedThreadPool
81+
threadPool.setDaemon(true)
82+
server.setThreadPool(threadPool)
83+
val resHandler = new ResourceHandler
84+
resHandler.setResourceBase(resourceBase.getAbsolutePath)
85+
86+
val handlerList = new HandlerList
87+
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
88+
89+
if (securityManager.isAuthenticationEnabled()) {
90+
logDebug("HttpServer is using security")
91+
val sh = setupSecurityHandler(securityManager)
92+
// make sure we go through security handler to get resources
93+
sh.setHandler(handlerList)
94+
server.setHandler(sh)
95+
} else {
96+
logDebug("HttpServer is not using security")
97+
server.setHandler(handlerList)
98+
}
99+
100+
server.start()
101+
val actualPort = server.getConnectors()(0).getLocalPort
102+
103+
(server, actualPort)
104+
}
105+
86106
/**
87107
* Setup Jetty to the HashLoginService using a single user with our
88108
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
@@ -134,7 +154,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
134154
if (server == null) {
135155
throw new ServerStateException("Server is not started")
136156
} else {
137-
return "http://" + Utils.localIpAddress + ":" + port
157+
"http://" + Utils.localIpAddress + ":" + port
138158
}
139159
}
140160
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,14 @@ private[spark] object SparkConf {
323323
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
324324
*/
325325
def isExecutorStartupConf(name: String): Boolean = {
326-
isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
326+
isAkkaConf(name) ||
327+
name.startsWith("spark.akka") ||
328+
name.startsWith("spark.auth") ||
329+
isSparkPortConf(name)
327330
}
331+
332+
/**
333+
* Return whether the given config is a Spark port config.
334+
*/
335+
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
328336
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.net.Socket
2222

2323
import scala.collection.JavaConversions._
2424
import scala.collection.mutable
25-
import scala.concurrent.Await
2625
import scala.util.Properties
2726

2827
import akka.actor._
@@ -151,10 +150,10 @@ object SparkEnv extends Logging {
151150
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
152151
securityManager = securityManager)
153152

154-
// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
155-
// figure out which port number Akka actually bound to and set spark.driver.port to it.
156-
if (isDriver && port == 0) {
157-
conf.set("spark.driver.port", boundPort.toString)
153+
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
154+
// This is so that we tell the executors the correct port to connect to.
155+
if (isDriver) {
156+
conf.set("spark.driver.port", boundPort.toString)
158157
}
159158

160159
// Create an instance of the class named by the given Java system property, or by
@@ -222,7 +221,8 @@ object SparkEnv extends Logging {
222221

223222
val httpFileServer =
224223
if (isDriver) {
225-
val server = new HttpFileServer(securityManager)
224+
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
225+
val server = new HttpFileServer(securityManager, fileServerPort)
226226
server.initialize()
227227
conf.set("spark.fileserver.uri", server.serverUri)
228228
server

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {
152152

153153
private def createServer(conf: SparkConf) {
154154
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
155-
server = new HttpServer(broadcastDir, securityManager)
155+
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
156+
server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
156157
server.start()
157158
serverUri = server.uri
158159
logInfo("Broadcast server started at " + serverUri)

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,6 @@ object Client {
155155
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
156156
Logger.getRootLogger.setLevel(driverArgs.logLevel)
157157

158-
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
159-
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
160158
val (actorSystem, _) = AkkaUtils.createActorSystem(
161159
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
162160

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
220220
/** Fill in values by parsing user options. */
221221
private def parseOpts(opts: Seq[String]): Unit = {
222222
var inSparkOpts = true
223+
val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r
223224

224225
// Delineates parsing of Spark options from parsing of user options.
225226
parse(opts)
@@ -322,33 +323,21 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
322323
verbose = true
323324
parse(tail)
324325

326+
case EQ_SEPARATED_OPT(opt, value) :: tail =>
327+
parse(opt :: value :: tail)
328+
329+
case value :: tail if value.startsWith("-") =>
330+
SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")
331+
325332
case value :: tail =>
326-
if (inSparkOpts) {
327-
value match {
328-
// convert --foo=bar to --foo bar
329-
case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
330-
val parts = v.split("=")
331-
parse(Seq(parts(0), parts(1)) ++ tail)
332-
case v if v.startsWith("-") =>
333-
val errMessage = s"Unrecognized option '$value'."
334-
SparkSubmit.printErrorAndExit(errMessage)
335-
case v =>
336-
primaryResource =
337-
if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
338-
Utils.resolveURI(v).toString
339-
} else {
340-
v
341-
}
342-
inSparkOpts = false
343-
isPython = SparkSubmit.isPython(v)
344-
parse(tail)
333+
primaryResource =
334+
if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
335+
Utils.resolveURI(value).toString
336+
} else {
337+
value
345338
}
346-
} else {
347-
if (!value.isEmpty) {
348-
childArgs += value
349-
}
350-
parse(tail)
351-
}
339+
isPython = SparkSubmit.isPython(value)
340+
childArgs ++= tail
352341

353342
case Nil =>
354343
}

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.util.AkkaUtils
2828
*/
2929
private[spark]
3030
class MasterWebUI(val master: Master, requestedPort: Int)
31-
extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
31+
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {
3232

3333
val masterActorRef = master.self
3434
val timeout = AkkaUtils.askTimeout(master.conf)

0 commit comments

Comments
 (0)