Skip to content

Commit a3eb717

Browse files
committed
Merge branch 'master' of github.com:apache/spark into SPARK-5811
2 parents c60156d + c06e42f commit a3eb717

File tree

180 files changed

+4563
-1916
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

180 files changed

+4563
-1916
lines changed

assembly/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,16 @@
114114
<exclude>META-INF/*.RSA</exclude>
115115
</excludes>
116116
</filter>
117+
<filter>
118+
<!-- Exclude libgfortran, libgcc for license issues -->
119+
<artifact>org.jblas:jblas</artifact>
120+
<excludes>
121+
<!-- Linux amd64 is OK; not statically linked -->
122+
<exclude>lib/Linux/i386/**</exclude>
123+
<exclude>lib/Mac OS X/**</exclude>
124+
<exclude>lib/Windows/**</exclude>
125+
</excludes>
126+
</filter>
117127
</filters>
118128
</configuration>
119129
<executions>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,6 +1363,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
13631363
cleaner.foreach(_.stop())
13641364
dagScheduler.stop()
13651365
dagScheduler = null
1366+
progressBar.foreach(_.stop())
13661367
taskScheduler = null
13671368
// TODO: Cache.stop()?
13681369
env.stop()
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.python
19+
20+
import java.io.DataOutputStream
21+
import java.net.Socket
22+
23+
import py4j.GatewayServer
24+
25+
import org.apache.spark.Logging
26+
import org.apache.spark.util.Utils
27+
28+
/**
29+
* Process that starts a Py4J GatewayServer on an ephemeral port and communicates the bound port
30+
* back to its caller via a callback port specified by the caller.
31+
*
32+
* This process is launched (via SparkSubmit) by the PySpark driver (see java_gateway.py).
33+
*/
34+
private[spark] object PythonGatewayServer extends Logging {
35+
def main(args: Array[String]): Unit = Utils.tryOrExit {
36+
// Start a GatewayServer on an ephemeral port
37+
val gatewayServer: GatewayServer = new GatewayServer(null, 0)
38+
gatewayServer.start()
39+
val boundPort: Int = gatewayServer.getListeningPort
40+
if (boundPort == -1) {
41+
logError("GatewayServer failed to bind; exiting")
42+
System.exit(1)
43+
} else {
44+
logDebug(s"Started PythonGatewayServer on port $boundPort")
45+
}
46+
47+
// Communicate the bound port back to the caller via the caller-specified callback port
48+
val callbackHost = sys.env("_PYSPARK_DRIVER_CALLBACK_HOST")
49+
val callbackPort = sys.env("_PYSPARK_DRIVER_CALLBACK_PORT").toInt
50+
logDebug(s"Communicating GatewayServer port to Python driver at $callbackHost:$callbackPort")
51+
val callbackSocket = new Socket(callbackHost, callbackPort)
52+
val dos = new DataOutputStream(callbackSocket.getOutputStream)
53+
dos.writeInt(boundPort)
54+
dos.close()
55+
callbackSocket.close()
56+
57+
// Exit on EOF or broken pipe to ensure that this process dies when the Python driver dies:
58+
while (System.in.read() != -1) {
59+
// Do nothing
60+
}
61+
logDebug("Exiting due to broken pipe from Python driver")
62+
System.exit(0)
63+
}
64+
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,24 @@ private[spark] class PythonRDD(
144144
stream.readFully(update)
145145
accumulator += Collections.singletonList(update)
146146
}
147+
147148
// Check whether the worker is ready to be re-used.
148-
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
149-
if (reuse_worker) {
150-
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
151-
released = true
149+
if (reuse_worker) {
150+
// It has a high possibility that the ending mark is already available,
151+
// And current task should not be blocked by checking it
152+
153+
if (stream.available() >= 4) {
154+
val ending = stream.readInt()
155+
if (ending == SpecialLengths.END_OF_STREAM) {
156+
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
157+
released = true
158+
logInfo(s"Communication with worker ended cleanly, re-use it: $worker")
159+
} else {
160+
logInfo(s"Communication with worker did not end cleanly " +
161+
s"(ending with $ending), close it: $worker")
162+
}
163+
} else {
164+
logInfo(s"The ending mark from worker is not available, close it: $worker")
152165
}
153166
}
154167
null
@@ -248,13 +261,13 @@ private[spark] class PythonRDD(
248261
} catch {
249262
case e: Exception if context.isCompleted || context.isInterrupted =>
250263
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
251-
worker.shutdownOutput()
264+
Utils.tryLog(worker.shutdownOutput())
252265

253266
case e: Exception =>
254267
// We must avoid throwing exceptions here, because the thread uncaught exception handler
255268
// will kill the whole executor (see org.apache.spark.executor.Executor).
256269
_exception = e
257-
worker.shutdownOutput()
270+
Utils.tryLog(worker.shutdownOutput())
258271
} finally {
259272
// Release memory used by this thread for shuffles
260273
env.shuffleMemoryManager.releaseMemoryForThisThread()

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
3939

4040
import org.apache.spark.SPARK_VERSION
4141
import org.apache.spark.deploy.rest._
42-
import org.apache.spark.executor._
4342
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
4443

4544
/**
@@ -302,8 +301,7 @@ object SparkSubmit {
302301
// If we're running a python app, set the main class to our specific python runner
303302
if (args.isPython && deployMode == CLIENT) {
304303
if (args.primaryResource == PYSPARK_SHELL) {
305-
args.mainClass = "py4j.GatewayServer"
306-
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
304+
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
307305
} else {
308306
// If a python file is provided, add it to the child arguments and list of files to deploy.
309307
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.deploy.rest
1919

20-
import scala.util.Try
21-
2220
import com.fasterxml.jackson.annotation._
2321
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
2422
import com.fasterxml.jackson.annotation.JsonInclude.Include
@@ -111,12 +109,14 @@ private[spark] object SubmitRestProtocolMessage {
111109
* If the action field is not found, throw a [[SubmitRestMissingFieldException]].
112110
*/
113111
def parseAction(json: String): String = {
114-
parse(json).asInstanceOf[JObject].obj
115-
.find { case (f, _) => f == "action" }
116-
.map { case (_, v) => v.asInstanceOf[JString].s }
117-
.getOrElse {
118-
throw new SubmitRestMissingFieldException(s"Action field not found in JSON:\n$json")
119-
}
112+
val value: Option[String] = parse(json) match {
113+
case JObject(fields) =>
114+
fields.collectFirst { case ("action", v) => v }.collect { case JString(s) => s }
115+
case _ => None
116+
}
117+
value.getOrElse {
118+
throw new SubmitRestMissingFieldException(s"Action field not found in JSON:\n$json")
119+
}
120120
}
121121

122122
/**

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
4949
}
5050
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
5151

52-
addShutdownHook()
52+
private val shutdownHook = addShutdownHook()
5353

5454
/** Looks up a file by hashing it into one of our local subdirectories. */
5555
// This method should be kept in sync with
@@ -134,17 +134,22 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
134134
}
135135
}
136136

137-
private def addShutdownHook() {
138-
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
137+
private def addShutdownHook(): Thread = {
138+
val shutdownHook = new Thread("delete Spark local dirs") {
139139
override def run(): Unit = Utils.logUncaughtExceptions {
140140
logDebug("Shutdown hook called")
141141
DiskBlockManager.this.stop()
142142
}
143-
})
143+
}
144+
Runtime.getRuntime.addShutdownHook(shutdownHook)
145+
shutdownHook
144146
}
145147

146148
/** Cleanup local dirs and stop shuffle sender. */
147149
private[spark] def stop() {
150+
// Remove the shutdown hook. It causes memory leaks if we leave it around.
151+
Runtime.getRuntime.removeShutdownHook(shutdownHook)
152+
148153
// Only perform cleanup if an external service is not serving our shuffle files.
149154
if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) {
150155
localDirs.foreach { localDir =>

core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark._
2828
* of them will be combined together, showed in one line.
2929
*/
3030
private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
31-
3231
// Carrige return
3332
val CR = '\r'
3433
// Update period of progress bar, in milliseconds
@@ -121,4 +120,10 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
121120
clear()
122121
lastFinishTime = System.currentTimeMillis()
123122
}
123+
124+
/**
125+
* Tear down the timer thread. The timer thread is a GC root, and it retains the entire
126+
* SparkContext if it's not terminated.
127+
*/
128+
def stop(): Unit = timer.cancel()
124129
}

core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,15 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
245245
val goodJson = constructSubmitRequest(masterUrl).toJson
246246
val badJson1 = goodJson.replaceAll("action", "fraction") // invalid JSON
247247
val badJson2 = goodJson.substring(goodJson.size / 2) // malformed JSON
248+
val notJson = "\"hello, world\""
248249
val (response1, code1) = sendHttpRequestWithResponse(submitRequestPath, "POST") // missing JSON
249250
val (response2, code2) = sendHttpRequestWithResponse(submitRequestPath, "POST", badJson1)
250251
val (response3, code3) = sendHttpRequestWithResponse(submitRequestPath, "POST", badJson2)
251252
val (response4, code4) = sendHttpRequestWithResponse(killRequestPath, "POST") // missing ID
252253
val (response5, code5) = sendHttpRequestWithResponse(s"$killRequestPath/", "POST")
253254
val (response6, code6) = sendHttpRequestWithResponse(statusRequestPath, "GET") // missing ID
254255
val (response7, code7) = sendHttpRequestWithResponse(s"$statusRequestPath/", "GET")
256+
val (response8, code8) = sendHttpRequestWithResponse(submitRequestPath, "POST", notJson)
255257
// these should all fail as error responses
256258
getErrorResponse(response1)
257259
getErrorResponse(response2)
@@ -260,13 +262,15 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
260262
getErrorResponse(response5)
261263
getErrorResponse(response6)
262264
getErrorResponse(response7)
265+
getErrorResponse(response8)
263266
assert(code1 === HttpServletResponse.SC_BAD_REQUEST)
264267
assert(code2 === HttpServletResponse.SC_BAD_REQUEST)
265268
assert(code3 === HttpServletResponse.SC_BAD_REQUEST)
266269
assert(code4 === HttpServletResponse.SC_BAD_REQUEST)
267270
assert(code5 === HttpServletResponse.SC_BAD_REQUEST)
268271
assert(code6 === HttpServletResponse.SC_BAD_REQUEST)
269272
assert(code7 === HttpServletResponse.SC_BAD_REQUEST)
273+
assert(code8 === HttpServletResponse.SC_BAD_REQUEST)
270274
}
271275

272276
test("bad request paths") {

core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.scalatest.FunSuite
2828

2929
import org.apache.hadoop.io.Text
3030

31-
import org.apache.spark.SparkContext
31+
import org.apache.spark.{SparkConf, SparkContext}
3232
import org.apache.spark.util.Utils
3333
import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodecFactory, GzipCodec}
3434

@@ -42,7 +42,15 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
4242
private var factory: CompressionCodecFactory = _
4343

4444
override def beforeAll() {
45-
sc = new SparkContext("local", "test")
45+
// Hadoop's FileSystem caching does not use the Configuration as part of its cache key, which
46+
// can cause Filesystem.get(Configuration) to return a cached instance created with a different
47+
// configuration than the one passed to get() (see HADOOP-8490 for more details). This caused
48+
// hard-to-reproduce test failures, since any suites that were run after this one would inherit
49+
// the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). To work around this,
50+
// we disable FileSystem caching in this suite.
51+
val conf = new SparkConf().set("spark.hadoop.fs.file.impl.disable.cache", "true")
52+
53+
sc = new SparkContext("local", "test", conf)
4654

4755
// Set the block size of local file system to test whether files are split right or not.
4856
sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
0.24579296,0.01
2+
0.28505864,0.02
3+
0.31208567,0.03
4+
0.35900051,0.04
5+
0.35747068,0.05
6+
0.16675166,0.06
7+
0.17491076,0.07
8+
0.04181540,0.08
9+
0.04793473,0.09
10+
0.03926568,0.10
11+
0.12952575,0.11
12+
0.00000000,0.12
13+
0.01376849,0.13
14+
0.13105558,0.14
15+
0.08873024,0.15
16+
0.12595614,0.16
17+
0.15247323,0.17
18+
0.25956145,0.18
19+
0.20040796,0.19
20+
0.19581846,0.20
21+
0.15757267,0.21
22+
0.13717491,0.22
23+
0.19020908,0.23
24+
0.19581846,0.24
25+
0.20091790,0.25
26+
0.16879143,0.26
27+
0.18510964,0.27
28+
0.20040796,0.28
29+
0.29576747,0.29
30+
0.43396226,0.30
31+
0.53391127,0.31
32+
0.52116267,0.32
33+
0.48546660,0.33
34+
0.49209587,0.34
35+
0.54156043,0.35
36+
0.59765426,0.36
37+
0.56144824,0.37
38+
0.58592555,0.38
39+
0.52983172,0.39
40+
0.50178480,0.40
41+
0.52626211,0.41
42+
0.58286588,0.42
43+
0.64660887,0.43
44+
0.68077511,0.44
45+
0.74298827,0.45
46+
0.64864865,0.46
47+
0.67261601,0.47
48+
0.65782764,0.48
49+
0.69811321,0.49
50+
0.63029067,0.50
51+
0.61601224,0.51
52+
0.63233044,0.52
53+
0.65323814,0.53
54+
0.65323814,0.54
55+
0.67363590,0.55
56+
0.67006629,0.56
57+
0.51555329,0.57
58+
0.50892402,0.58
59+
0.33299337,0.59
60+
0.36206017,0.60
61+
0.43090260,0.61
62+
0.45996940,0.62
63+
0.56348802,0.63
64+
0.54920959,0.64
65+
0.48393677,0.65
66+
0.48495665,0.66
67+
0.46965834,0.67
68+
0.45181030,0.68
69+
0.45843957,0.69
70+
0.47118817,0.70
71+
0.51555329,0.71
72+
0.58031617,0.72
73+
0.55481897,0.73
74+
0.56297807,0.74
75+
0.56603774,0.75
76+
0.57929628,0.76
77+
0.64762876,0.77
78+
0.66241713,0.78
79+
0.69301377,0.79
80+
0.65119837,0.80
81+
0.68332483,0.81
82+
0.66598674,0.82
83+
0.73890872,0.83
84+
0.73992861,0.84
85+
0.84242733,0.85
86+
0.91330954,0.86
87+
0.88016318,0.87
88+
0.90719021,0.88
89+
0.93115757,0.89
90+
0.93115757,0.90
91+
0.91942886,0.91
92+
0.92911780,0.92
93+
0.95665477,0.93
94+
0.95002550,0.94
95+
0.96940337,0.95
96+
1.00000000,0.96
97+
0.89801122,0.97
98+
0.90311066,0.98
99+
0.90362060,0.99
100+
0.83477817,1.0

0 commit comments

Comments
 (0)