Skip to content

Commit 1ed3ba2

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into narrow
2 parents 4d29932 + 445a755 commit 1ed3ba2

File tree

142 files changed

+2700
-1225
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

142 files changed

+2700
-1225
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ fairscheduler.xml.template
1919
spark-defaults.conf.template
2020
log4j.properties
2121
log4j.properties.template
22+
metrics.properties
2223
metrics.properties.template
2324
slaves
2425
slaves.template

build/mvn

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
2222
# Preserve the calling directory
2323
_CALLING_DIR="$(pwd)"
24+
# Options used during compilation
25+
_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
2426

2527
# Installs any application tarball given a URL, the expected tarball name,
2628
# and, optionally, a checkable binary path to determine if the binary has
@@ -136,14 +138,15 @@ cd "${_CALLING_DIR}"
136138
# Now that zinc is ensured to be installed, check its status and, if its
137139
# not running or just installed, start it
138140
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status`" ]; then
141+
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
139142
${ZINC_BIN} -shutdown
140143
${ZINC_BIN} -start -port ${ZINC_PORT} \
141144
-scala-compiler "${SCALA_COMPILER}" \
142145
-scala-library "${SCALA_LIBRARY}" &>/dev/null
143146
fi
144147

145148
# Set any `mvn` options if not already present
146-
export MAVEN_OPTS=${MAVEN_OPTS:-"-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"}
149+
export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
147150

148151
# Last, call the `mvn` command as usual
149152
${MVN_BIN} "$@"

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1370,6 +1370,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
13701370
cleaner.foreach(_.stop())
13711371
dagScheduler.stop()
13721372
dagScheduler = null
1373+
progressBar.foreach(_.stop())
13731374
taskScheduler = null
13741375
// TODO: Cache.stop()?
13751376
env.stop()
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.python
19+
20+
import java.io.DataOutputStream
21+
import java.net.Socket
22+
23+
import py4j.GatewayServer
24+
25+
import org.apache.spark.Logging
26+
import org.apache.spark.util.Utils
27+
28+
/**
29+
* Process that starts a Py4J GatewayServer on an ephemeral port and communicates the bound port
30+
* back to its caller via a callback port specified by the caller.
31+
*
32+
* This process is launched (via SparkSubmit) by the PySpark driver (see java_gateway.py).
33+
*/
34+
private[spark] object PythonGatewayServer extends Logging {
35+
def main(args: Array[String]): Unit = Utils.tryOrExit {
36+
// Start a GatewayServer on an ephemeral port
37+
val gatewayServer: GatewayServer = new GatewayServer(null, 0)
38+
gatewayServer.start()
39+
val boundPort: Int = gatewayServer.getListeningPort
40+
if (boundPort == -1) {
41+
logError("GatewayServer failed to bind; exiting")
42+
System.exit(1)
43+
} else {
44+
logDebug(s"Started PythonGatewayServer on port $boundPort")
45+
}
46+
47+
// Communicate the bound port back to the caller via the caller-specified callback port
48+
val callbackHost = sys.env("_PYSPARK_DRIVER_CALLBACK_HOST")
49+
val callbackPort = sys.env("_PYSPARK_DRIVER_CALLBACK_PORT").toInt
50+
logDebug(s"Communicating GatewayServer port to Python driver at $callbackHost:$callbackPort")
51+
val callbackSocket = new Socket(callbackHost, callbackPort)
52+
val dos = new DataOutputStream(callbackSocket.getOutputStream)
53+
dos.writeInt(boundPort)
54+
dos.close()
55+
callbackSocket.close()
56+
57+
// Exit on EOF or broken pipe to ensure that this process dies when the Python driver dies:
58+
while (System.in.read() != -1) {
59+
// Do nothing
60+
}
61+
logDebug("Exiting due to broken pipe from Python driver")
62+
System.exit(0)
63+
}
64+
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,13 +248,13 @@ private[spark] class PythonRDD(
248248
} catch {
249249
case e: Exception if context.isCompleted || context.isInterrupted =>
250250
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
251-
worker.shutdownOutput()
251+
Utils.tryLog(worker.shutdownOutput())
252252

253253
case e: Exception =>
254254
// We must avoid throwing exceptions here, because the thread uncaught exception handler
255255
// will kill the whole executor (see org.apache.spark.executor.Executor).
256256
_exception = e
257-
worker.shutdownOutput()
257+
Utils.tryLog(worker.shutdownOutput())
258258
} finally {
259259
// Release memory used by this thread for shuffles
260260
env.shuffleMemoryManager.releaseMemoryForThisThread()

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
3939

4040
import org.apache.spark.SPARK_VERSION
4141
import org.apache.spark.deploy.rest._
42-
import org.apache.spark.executor._
4342
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
4443

4544
/**
@@ -284,8 +283,7 @@ object SparkSubmit {
284283
// If we're running a python app, set the main class to our specific python runner
285284
if (args.isPython && deployMode == CLIENT) {
286285
if (args.primaryResource == PYSPARK_SHELL) {
287-
args.mainClass = "py4j.GatewayServer"
288-
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
286+
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
289287
} else {
290288
// If a python file is provided, add it to the child arguments and list of files to deploy.
291289
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.deploy.rest
1919

20-
import scala.util.Try
21-
2220
import com.fasterxml.jackson.annotation._
2321
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
2422
import com.fasterxml.jackson.annotation.JsonInclude.Include
@@ -111,12 +109,14 @@ private[spark] object SubmitRestProtocolMessage {
111109
* If the action field is not found, throw a [[SubmitRestMissingFieldException]].
112110
*/
113111
def parseAction(json: String): String = {
114-
parse(json).asInstanceOf[JObject].obj
115-
.find { case (f, _) => f == "action" }
116-
.map { case (_, v) => v.asInstanceOf[JString].s }
117-
.getOrElse {
118-
throw new SubmitRestMissingFieldException(s"Action field not found in JSON:\n$json")
119-
}
112+
val value: Option[String] = parse(json) match {
113+
case JObject(fields) =>
114+
fields.collectFirst { case ("action", v) => v }.collect { case JString(s) => s }
115+
case _ => None
116+
}
117+
value.getOrElse {
118+
throw new SubmitRestMissingFieldException(s"Action field not found in JSON:\n$json")
119+
}
120120
}
121121

122122
/**

core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ import org.apache.spark.util.Utils
2828

2929
private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
3030

31-
val DEFAULT_PREFIX = "*"
32-
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
33-
val METRICS_CONF = "metrics.properties"
31+
private val DEFAULT_PREFIX = "*"
32+
private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
33+
private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"
3434

35-
val properties = new Properties()
36-
var propertyCategories: mutable.HashMap[String, Properties] = null
35+
private[metrics] val properties = new Properties()
36+
private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null
3737

3838
private def setDefaultProperties(prop: Properties) {
3939
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
@@ -47,20 +47,22 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
4747
setDefaultProperties(properties)
4848

4949
// If spark.metrics.conf is not set, try to get file in class path
50-
var is: InputStream = null
51-
try {
52-
is = configFile match {
53-
case Some(f) => new FileInputStream(f)
54-
case None => Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)
50+
val isOpt: Option[InputStream] = configFile.map(new FileInputStream(_)).orElse {
51+
try {
52+
Option(Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME))
53+
} catch {
54+
case e: Exception =>
55+
logError("Error loading default configuration file", e)
56+
None
5557
}
58+
}
5659

57-
if (is != null) {
60+
isOpt.foreach { is =>
61+
try {
5862
properties.load(is)
63+
} finally {
64+
is.close()
5965
}
60-
} catch {
61-
case e: Exception => logError("Error loading configure file", e)
62-
} finally {
63-
if (is != null) is.close()
6466
}
6567

6668
propertyCategories = subProperties(properties, INSTANCE_REGEX)

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,10 @@ private[spark] class MetricsSystem private (
191191
sinks += sink.asInstanceOf[Sink]
192192
}
193193
} catch {
194-
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
194+
case e: Exception => {
195+
logError("Sink class " + classPath + " cannot be instantialized")
196+
throw e
197+
}
195198
}
196199
}
197200
}

core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.nio.ByteBuffer
21+
import java.util.concurrent.RejectedExecutionException
2122

2223
import scala.language.existentials
2324
import scala.util.control.NonFatal
@@ -95,25 +96,30 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
9596
def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
9697
serializedData: ByteBuffer) {
9798
var reason : TaskEndReason = UnknownReason
98-
getTaskResultExecutor.execute(new Runnable {
99-
override def run(): Unit = Utils.logUncaughtExceptions {
100-
try {
101-
if (serializedData != null && serializedData.limit() > 0) {
102-
reason = serializer.get().deserialize[TaskEndReason](
103-
serializedData, Utils.getSparkClassLoader)
99+
try {
100+
getTaskResultExecutor.execute(new Runnable {
101+
override def run(): Unit = Utils.logUncaughtExceptions {
102+
try {
103+
if (serializedData != null && serializedData.limit() > 0) {
104+
reason = serializer.get().deserialize[TaskEndReason](
105+
serializedData, Utils.getSparkClassLoader)
106+
}
107+
} catch {
108+
case cnd: ClassNotFoundException =>
109+
// Log an error but keep going here -- the task failed, so not catastrophic
110+
// if we can't deserialize the reason.
111+
val loader = Utils.getContextOrSparkClassLoader
112+
logError(
113+
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
114+
case ex: Exception => {}
104115
}
105-
} catch {
106-
case cnd: ClassNotFoundException =>
107-
// Log an error but keep going here -- the task failed, so not catastrophic if we can't
108-
// deserialize the reason.
109-
val loader = Utils.getContextOrSparkClassLoader
110-
logError(
111-
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
112-
case ex: Exception => {}
116+
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
113117
}
114-
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
115-
}
116-
})
118+
})
119+
} catch {
120+
case e: RejectedExecutionException if sparkEnv.isStopped =>
121+
// ignore it
122+
}
117123
}
118124

119125
def stop() {

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
4949
}
5050
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
5151

52-
addShutdownHook()
52+
private val shutdownHook = addShutdownHook()
5353

5454
/** Looks up a file by hashing it into one of our local subdirectories. */
5555
// This method should be kept in sync with
@@ -134,17 +134,25 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
134134
}
135135
}
136136

137-
private def addShutdownHook() {
138-
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
137+
private def addShutdownHook(): Thread = {
138+
val shutdownHook = new Thread("delete Spark local dirs") {
139139
override def run(): Unit = Utils.logUncaughtExceptions {
140140
logDebug("Shutdown hook called")
141-
DiskBlockManager.this.stop()
141+
DiskBlockManager.this.doStop()
142142
}
143-
})
143+
}
144+
Runtime.getRuntime.addShutdownHook(shutdownHook)
145+
shutdownHook
144146
}
145147

146148
/** Cleanup local dirs and stop shuffle sender. */
147149
private[spark] def stop() {
150+
// Remove the shutdown hook. It causes memory leaks if we leave it around.
151+
Runtime.getRuntime.removeShutdownHook(shutdownHook)
152+
doStop()
153+
}
154+
155+
private def doStop(): Unit = {
148156
// Only perform cleanup if an external service is not serving our shuffle files.
149157
if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) {
150158
localDirs.foreach { localDir =>

core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark._
2828
* of them will be combined together, showed in one line.
2929
*/
3030
private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
31-
3231
// Carrige return
3332
val CR = '\r'
3433
// Update period of progress bar, in milliseconds
@@ -121,4 +120,10 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
121120
clear()
122121
lastFinishTime = System.currentTimeMillis()
123122
}
123+
124+
/**
125+
* Tear down the timer thread. The timer thread is a GC root, and it retains the entire
126+
* SparkContext if it's not terminated.
127+
*/
128+
def stop(): Unit = timer.cancel()
124129
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,8 @@ private[spark] object Utils extends Logging {
213213
// Is the path already registered to be deleted via a shutdown hook ?
214214
def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
215215
val absolutePath = file.getPath()
216-
shutdownDeletePaths.synchronized {
217-
shutdownDeletePaths.contains(absolutePath)
216+
shutdownDeleteTachyonPaths.synchronized {
217+
shutdownDeleteTachyonPaths.contains(absolutePath)
218218
}
219219
}
220220

core/src/test/resources/test_metrics_system.properties

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,5 @@
1818
*.sink.console.period = 10
1919
*.sink.console.unit = seconds
2020
test.sink.console.class = org.apache.spark.metrics.sink.ConsoleSink
21-
test.sink.dummy.class = org.apache.spark.metrics.sink.DummySink
22-
test.source.dummy.class = org.apache.spark.metrics.source.DummySource
2321
test.sink.console.period = 20
2422
test.sink.console.unit = minutes

core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,15 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
245245
val goodJson = constructSubmitRequest(masterUrl).toJson
246246
val badJson1 = goodJson.replaceAll("action", "fraction") // invalid JSON
247247
val badJson2 = goodJson.substring(goodJson.size / 2) // malformed JSON
248+
val notJson = "\"hello, world\""
248249
val (response1, code1) = sendHttpRequestWithResponse(submitRequestPath, "POST") // missing JSON
249250
val (response2, code2) = sendHttpRequestWithResponse(submitRequestPath, "POST", badJson1)
250251
val (response3, code3) = sendHttpRequestWithResponse(submitRequestPath, "POST", badJson2)
251252
val (response4, code4) = sendHttpRequestWithResponse(killRequestPath, "POST") // missing ID
252253
val (response5, code5) = sendHttpRequestWithResponse(s"$killRequestPath/", "POST")
253254
val (response6, code6) = sendHttpRequestWithResponse(statusRequestPath, "GET") // missing ID
254255
val (response7, code7) = sendHttpRequestWithResponse(s"$statusRequestPath/", "GET")
256+
val (response8, code8) = sendHttpRequestWithResponse(submitRequestPath, "POST", notJson)
255257
// these should all fail as error responses
256258
getErrorResponse(response1)
257259
getErrorResponse(response2)
@@ -260,13 +262,15 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
260262
getErrorResponse(response5)
261263
getErrorResponse(response6)
262264
getErrorResponse(response7)
265+
getErrorResponse(response8)
263266
assert(code1 === HttpServletResponse.SC_BAD_REQUEST)
264267
assert(code2 === HttpServletResponse.SC_BAD_REQUEST)
265268
assert(code3 === HttpServletResponse.SC_BAD_REQUEST)
266269
assert(code4 === HttpServletResponse.SC_BAD_REQUEST)
267270
assert(code5 === HttpServletResponse.SC_BAD_REQUEST)
268271
assert(code6 === HttpServletResponse.SC_BAD_REQUEST)
269272
assert(code7 === HttpServletResponse.SC_BAD_REQUEST)
273+
assert(code8 === HttpServletResponse.SC_BAD_REQUEST)
270274
}
271275

272276
test("bad request paths") {

0 commit comments

Comments
 (0)