Skip to content

Commit 8380064

Browse files
committed
Merge branch 'master' of github.com:apache/spark into streaming
Conflicts: python/docs/modules.rst python/run-tests
2 parents 52c535b + 69c3f44 commit 8380064

File tree

146 files changed

+3269
-1514
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

146 files changed

+3269
-1514
lines changed

bin/compute-classpath.cmd

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,13 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
3636
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
3737

3838
rem Build up classpath
39-
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf
39+
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%
40+
41+
if "x%SPARK_CONF_DIR%"!="x" (
42+
set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
43+
) else (
44+
set CLASSPATH=%CLASSPATH%;%FWDIR%conf
45+
)
4046

4147
if exist "%FWDIR%RELEASE" (
4248
for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (

bin/compute-classpath.sh

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,14 @@ FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
2727

2828
. "$FWDIR"/bin/load-spark-env.sh
2929

30+
CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH"
31+
3032
# Build up classpath
31-
CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"
33+
if [ -n "$SPARK_CONF_DIR" ]; then
34+
CLASSPATH="$CLASSPATH:$SPARK_CONF_DIR"
35+
else
36+
CLASSPATH="$CLASSPATH:$FWDIR/conf"
37+
fi
3238

3339
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"
3440

bin/pyspark

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,20 @@ fi
5252

5353
# Figure out which Python executable to use
5454
if [[ -z "$PYSPARK_PYTHON" ]]; then
55-
PYSPARK_PYTHON="python"
55+
if [[ "$IPYTHON" = "1" || -n "$IPYTHON_OPTS" ]]; then
56+
# for backward compatibility
57+
PYSPARK_PYTHON="ipython"
58+
else
59+
PYSPARK_PYTHON="python"
60+
fi
5661
fi
5762
export PYSPARK_PYTHON
5863

64+
if [[ -z "$PYSPARK_PYTHON_OPTS" && -n "$IPYTHON_OPTS" ]]; then
65+
# for backward compatibility
66+
PYSPARK_PYTHON_OPTS="$IPYTHON_OPTS"
67+
fi
68+
5969
# Add the PySpark classes to the Python path:
6070
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
6171
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
@@ -64,11 +74,6 @@ export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
6474
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
6575
export PYTHONSTARTUP="$FWDIR/python/pyspark/shell.py"
6676

67-
# If IPython options are specified, assume user wants to run IPython
68-
if [[ -n "$IPYTHON_OPTS" ]]; then
69-
IPYTHON=1
70-
fi
71-
7277
# Build up arguments list manually to preserve quotes and backslashes.
7378
# We export Spark submit arguments as an environment variable because shell.py must run as a
7479
# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks.
@@ -106,10 +111,5 @@ if [[ "$1" =~ \.py$ ]]; then
106111
else
107112
# PySpark shell requires special handling downstream
108113
export PYSPARK_SHELL=1
109-
# Only use ipython if no command line arguments were provided [SPARK-1134]
110-
if [[ "$IPYTHON" = "1" ]]; then
111-
exec ${PYSPARK_PYTHON:-ipython} $IPYTHON_OPTS
112-
else
113-
exec "$PYSPARK_PYTHON"
114-
fi
114+
exec "$PYSPARK_PYTHON" $PYSPARK_PYTHON_OPTS
115115
fi

bin/pyspark2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*
3333
)
3434
if [%FOUND_JAR%] == [0] (
3535
echo Failed to find Spark assembly JAR.
36-
echo You need to build Spark with sbt\sbt assembly before running this program.
36+
echo You need to build Spark before running this program.
3737
goto exit
3838
)
3939
:skip_build_test

bin/run-example2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ if exist "%FWDIR%RELEASE" (
5252
)
5353
if "x%SPARK_EXAMPLES_JAR%"=="x" (
5454
echo Failed to find Spark examples assembly JAR.
55-
echo You need to build Spark with sbt\sbt assembly before running this program.
55+
echo You need to build Spark before running this program.
5656
goto exit
5757
)
5858

bin/spark-class

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ fi
146146
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
147147
if test -z "$SPARK_TOOLS_JAR"; then
148148
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SCALA_VERSION/" 1>&2
149-
echo "You need to build spark before running $1." 1>&2
149+
echo "You need to build Spark before running $1." 1>&2
150150
exit 1
151151
fi
152152
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"

bin/spark-class2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*
104104
)
105105
if "%FOUND_JAR%"=="0" (
106106
echo Failed to find Spark assembly JAR.
107-
echo You need to build Spark with sbt\sbt assembly before running this program.
107+
echo You need to build Spark before running this program.
108108
goto exit
109109
)
110110
:skip_build_test

bin/utils.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# limitations under the License.
1818
#
1919

20-
# Gather all all spark-submit options into SUBMISSION_OPTS
20+
# Gather all spark-submit options into SUBMISSION_OPTS
2121
function gatherSparkSubmitOpts() {
2222

2323
if [ -z "$SUBMIT_USAGE_FUNCTION" ]; then

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
103103
* and a Server, so for a particular connection is has to determine what to do.
104104
* A ConnectionId was added to be able to track connections and is used to
105105
* match up incoming messages with connections waiting for authentication.
106-
* If its acting as a client and trying to send a message to another ConnectionManager,
107-
* it blocks the thread calling sendMessage until the SASL negotiation has occurred.
108106
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
109-
* and waits for the response from the server and does the handshake.
107+
* and waits for the response from the server and does the handshake before sending
108+
* the real message.
110109
*
111110
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
112111
* can be used. Yarn requires a specific AmIpFilter be installed for security to work

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging {
187187
val master = conf.get("spark.master")
188188
val appName = conf.get("spark.app.name")
189189

190+
private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
191+
private[spark] val eventLogDir: Option[String] = {
192+
if (isEventLogEnabled) {
193+
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
194+
} else {
195+
None
196+
}
197+
}
198+
190199
// Generate the random name for a temp folder in Tachyon
191200
// Add a timestamp as the suffix here to make it more safe
192201
val tachyonFolderName = "spark-" + randomUUID.toString()
@@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging {
200209
private[spark] val listenerBus = new LiveListenerBus
201210

202211
// Create the Spark execution environment (cache, map output tracker, etc)
212+
conf.set("spark.executor.id", "driver")
203213
private[spark] val env = SparkEnv.create(
204214
conf,
205215
"<driver>",
@@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging {
232242
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
233243
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
234244

235-
// Optionally log Spark events
236-
private[spark] val eventLogger: Option[EventLoggingListener] = {
237-
if (conf.getBoolean("spark.eventLog.enabled", false)) {
238-
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
239-
logger.start()
240-
listenerBus.addListener(logger)
241-
Some(logger)
242-
} else None
243-
}
244-
245-
// At this point, all relevant SparkListeners have been registered, so begin releasing events
246-
listenerBus.start()
247-
248245
val startTime = System.currentTimeMillis()
249246

250247
// Add each JAR given through the constructor
@@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging {
309306
// constructor
310307
taskScheduler.start()
311308

309+
val applicationId: String = taskScheduler.applicationId()
310+
conf.set("spark.app.id", applicationId)
311+
312+
val metricsSystem = env.metricsSystem
313+
314+
// The metrics system for Driver need to be set spark.app.id to app ID.
315+
// So it should start after we get app ID from the task scheduler and set spark.app.id.
316+
metricsSystem.start()
317+
318+
// Optionally log Spark events
319+
private[spark] val eventLogger: Option[EventLoggingListener] = {
320+
if (isEventLogEnabled) {
321+
val logger =
322+
new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
323+
logger.start()
324+
listenerBus.addListener(logger)
325+
Some(logger)
326+
} else None
327+
}
328+
329+
// At this point, all relevant SparkListeners have been registered, so begin releasing events
330+
listenerBus.start()
331+
312332
private[spark] val cleaner: Option[ContextCleaner] = {
313333
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
314334
Some(new ContextCleaner(this))
@@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging {
411431
// Post init
412432
taskScheduler.postStartHook()
413433

414-
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
415-
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
434+
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
435+
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
416436

417437
private def initDriverMetrics() {
418438
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
@@ -759,20 +779,20 @@ class SparkContext(config: SparkConf) extends Logging {
759779
/**
760780
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
761781
* with `+=`. Only the driver can access the accumuable's `value`.
762-
* @tparam T accumulator type
763-
* @tparam R type that can be added to the accumulator
782+
* @tparam R accumulator result type
783+
* @tparam T type that can be added to the accumulator
764784
*/
765-
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
785+
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) =
766786
new Accumulable(initialValue, param)
767787

768788
/**
769789
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
770790
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
771791
* access the accumuable's `value`.
772-
* @tparam T accumulator type
773-
* @tparam R type that can be added to the accumulator
792+
* @tparam R accumulator result type
793+
* @tparam T type that can be added to the accumulator
774794
*/
775-
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
795+
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) =
776796
new Accumulable(initialValue, param, Some(name))
777797

778798
/**
@@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging {
12781298
private def postApplicationStart() {
12791299
// Note: this code assumes that the task scheduler has been initialized and has contacted
12801300
// the cluster manager to get an application ID (in case the cluster manager provides one).
1281-
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
1301+
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
12821302
startTime, sparkUser))
12831303
}
12841304

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,15 @@ object SparkEnv extends Logging {
268268
}
269269

270270
val metricsSystem = if (isDriver) {
271+
// Don't start metrics system right now for Driver.
272+
// We need to wait for the task scheduler to give us an app ID.
273+
// Then we can start the metrics system.
271274
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
272275
} else {
273-
MetricsSystem.createMetricsSystem("executor", conf, securityManager)
276+
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
277+
ms.start()
278+
ms
274279
}
275-
metricsSystem.start()
276280

277281
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
278282
// this is a temporary directory; in distributed mode, this is the executor's current working

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,18 +163,23 @@ private[broadcast] object HttpBroadcast extends Logging {
163163

164164
private def write(id: Long, value: Any) {
165165
val file = getFile(id)
166-
val out: OutputStream = {
167-
if (compress) {
168-
compressionCodec.compressedOutputStream(new FileOutputStream(file))
169-
} else {
170-
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
166+
val fileOutputStream = new FileOutputStream(file)
167+
try {
168+
val out: OutputStream = {
169+
if (compress) {
170+
compressionCodec.compressedOutputStream(fileOutputStream)
171+
} else {
172+
new BufferedOutputStream(fileOutputStream, bufferSize)
173+
}
171174
}
175+
val ser = SparkEnv.get.serializer.newInstance()
176+
val serOut = ser.serializeStream(out)
177+
serOut.writeObject(value)
178+
serOut.close()
179+
files += file
180+
} finally {
181+
fileOutputStream.close()
172182
}
173-
val ser = SparkEnv.get.serializer.newInstance()
174-
val serOut = ser.serializeStream(out)
175-
serOut.writeObject(value)
176-
serOut.close()
177-
files += file
178183
}
179184

180185
private def read[T: ClassTag](id: Long): T = {

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ object PythonRunner {
5454
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
5555

5656
// Launch Python process
57-
val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs)
57+
val builder = new ProcessBuilder(Seq(pythonExec, formattedPythonFile) ++ otherArgs)
5858
val env = builder.environment()
5959
env.put("PYTHONPATH", pythonPath)
60+
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
6061
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
6162
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
6263
val process = builder.start()

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ import org.apache.spark.util.Utils
2929

3030
/**
3131
* Parses and encapsulates arguments from the spark-submit script.
32+
* The env argument is used for testing.
3233
*/
33-
private[spark] class SparkSubmitArguments(args: Seq[String]) {
34+
private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) {
3435
var master: String = null
3536
var deployMode: String = null
3637
var executorMemory: String = null
@@ -90,20 +91,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
9091
private def mergeSparkProperties(): Unit = {
9192
// Use common defaults file, if not specified by user
9293
if (propertiesFile == null) {
93-
sys.env.get("SPARK_CONF_DIR").foreach { sparkConfDir =>
94-
val sep = File.separator
95-
val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
96-
val file = new File(defaultPath)
97-
if (file.exists()) {
98-
propertiesFile = file.getAbsolutePath
99-
}
100-
}
101-
}
94+
val sep = File.separator
95+
val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf")
96+
val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig)
10297

103-
if (propertiesFile == null) {
104-
sys.env.get("SPARK_HOME").foreach { sparkHome =>
105-
val sep = File.separator
106-
val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf"
98+
confDir.foreach { sparkConfDir =>
99+
val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
107100
val file = new File(defaultPath)
108101
if (file.exists()) {
109102
propertiesFile = file.getAbsolutePath
@@ -117,19 +110,18 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
117110

118111
// Use properties file as fallback for values which have a direct analog to
119112
// arguments in this script.
120-
master = Option(master).getOrElse(properties.get("spark.master").orNull)
121-
executorMemory = Option(executorMemory)
122-
.getOrElse(properties.get("spark.executor.memory").orNull)
123-
executorCores = Option(executorCores)
124-
.getOrElse(properties.get("spark.executor.cores").orNull)
113+
master = Option(master).orElse(properties.get("spark.master")).orNull
114+
executorMemory = Option(executorMemory).orElse(properties.get("spark.executor.memory")).orNull
115+
executorCores = Option(executorCores).orElse(properties.get("spark.executor.cores")).orNull
125116
totalExecutorCores = Option(totalExecutorCores)
126-
.getOrElse(properties.get("spark.cores.max").orNull)
127-
name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
128-
jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)
117+
.orElse(properties.get("spark.cores.max"))
118+
.orNull
119+
name = Option(name).orElse(properties.get("spark.app.name")).orNull
120+
jars = Option(jars).orElse(properties.get("spark.jars")).orNull
129121

130122
// This supports env vars in older versions of Spark
131-
master = Option(master).getOrElse(System.getenv("MASTER"))
132-
deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE"))
123+
master = Option(master).orElse(env.get("MASTER")).orNull
124+
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
133125

134126
// Try to set main class from JAR if no --class argument is given
135127
if (mainClass == null && !isPython && primaryResource != null) {
@@ -182,7 +174,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
182174
}
183175

184176
if (master.startsWith("yarn")) {
185-
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
177+
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
186178
if (!hasHadoopEnv && !Utils.isTesting) {
187179
throw new Exception(s"When running with master '$master' " +
188180
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")

0 commit comments

Comments
 (0)