Skip to content

Commit e42c452

Browse files
committed
merge master
2 parents 93518fb + 942847f commit e42c452

File tree

112 files changed

+3892
-855
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

112 files changed

+3892
-855
lines changed

bin/pyspark

+38-13
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,47 @@ fi
5050

5151
. "$FWDIR"/bin/load-spark-env.sh
5252

53-
# Figure out which Python executable to use
53+
# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
54+
# executable, while the worker would still be launched using PYSPARK_PYTHON.
55+
#
56+
# In Spark 1.2, we removed the documentation of the IPYTHON and IPYTHON_OPTS variables and added
57+
# PYSPARK_DRIVER_PYTHON and PYSPARK_DRIVER_PYTHON_OPTS to allow IPython to be used for the driver.
58+
# Now, users can simply set PYSPARK_DRIVER_PYTHON=ipython to use IPython and set
59+
# PYSPARK_DRIVER_PYTHON_OPTS to pass options when starting the Python driver
60+
# (e.g. PYSPARK_DRIVER_PYTHON_OPTS='notebook'). This supports full customization of the IPython
61+
# and executor Python executables.
62+
#
63+
# For backwards-compatibility, we retain the old IPYTHON and IPYTHON_OPTS variables.
64+
65+
# Determine the Python executable to use if PYSPARK_PYTHON or PYSPARK_DRIVER_PYTHON isn't set:
66+
if hash python2.7 2>/dev/null; then
67+
# Attempt to use Python 2.7, if installed:
68+
DEFAULT_PYTHON="python2.7"
69+
else
70+
DEFAULT_PYTHON="python"
71+
fi
72+
73+
# Determine the Python executable to use for the driver:
74+
if [[ -n "$IPYTHON_OPTS" || "$IPYTHON" == "1" ]]; then
75+
# If IPython options are specified, assume user wants to run IPython
76+
# (for backwards-compatibility)
77+
PYSPARK_DRIVER_PYTHON_OPTS="$PYSPARK_DRIVER_PYTHON_OPTS $IPYTHON_OPTS"
78+
PYSPARK_DRIVER_PYTHON="ipython"
79+
elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
80+
PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
81+
fi
82+
83+
# Determine the Python executable to use for the executors:
5484
if [[ -z "$PYSPARK_PYTHON" ]]; then
55-
if [[ "$IPYTHON" = "1" || -n "$IPYTHON_OPTS" ]]; then
56-
# for backward compatibility
57-
PYSPARK_PYTHON="ipython"
85+
if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && $DEFAULT_PYTHON != "python2.7" ]]; then
86+
echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2
87+
exit 1
5888
else
59-
PYSPARK_PYTHON="python"
89+
PYSPARK_PYTHON="$DEFAULT_PYTHON"
6090
fi
6191
fi
6292
export PYSPARK_PYTHON
6393

64-
if [[ -z "$PYSPARK_PYTHON_OPTS" && -n "$IPYTHON_OPTS" ]]; then
65-
# for backward compatibility
66-
PYSPARK_PYTHON_OPTS="$IPYTHON_OPTS"
67-
fi
68-
6994
# Add the PySpark classes to the Python path:
7095
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
7196
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
@@ -93,9 +118,9 @@ if [[ -n "$SPARK_TESTING" ]]; then
93118
unset YARN_CONF_DIR
94119
unset HADOOP_CONF_DIR
95120
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
96-
exec "$PYSPARK_PYTHON" -m doctest $1
121+
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
97122
else
98-
exec "$PYSPARK_PYTHON" $1
123+
exec "$PYSPARK_DRIVER_PYTHON" $1
99124
fi
100125
exit
101126
fi
@@ -111,5 +136,5 @@ if [[ "$1" =~ \.py$ ]]; then
111136
else
112137
# PySpark shell requires special handling downstream
113138
export PYSPARK_SHELL=1
114-
exec "$PYSPARK_PYTHON" $PYSPARK_PYTHON_OPTS
139+
exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
115140
fi

core/src/main/scala/org/apache/spark/SparkContext.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.language.implicitConversions
2121

2222
import java.io._
2323
import java.net.URI
24+
import java.util.Arrays
2425
import java.util.concurrent.atomic.AtomicInteger
2526
import java.util.{Properties, UUID}
2627
import java.util.UUID.randomUUID
@@ -1429,7 +1430,10 @@ object SparkContext extends Logging {
14291430
simpleWritableConverter[Boolean, BooleanWritable](_.get)
14301431

14311432
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
1432-
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
1433+
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
1434+
// getBytes method returns array which is longer then data to be returned
1435+
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
1436+
)
14331437
}
14341438

14351439
implicit def stringWritableConverter(): WritableConverter[String] =

core/src/main/scala/org/apache/spark/TestUtils.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
2626
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
2727
import com.google.common.io.Files
2828

29+
import org.apache.spark.util.Utils
30+
2931
/**
3032
* Utilities for tests. Included in main codebase since it's used by multiple
3133
* projects.
@@ -42,8 +44,7 @@ private[spark] object TestUtils {
4244
* in order to avoid interference between tests.
4345
*/
4446
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
45-
val tempDir = Files.createTempDir()
46-
tempDir.deleteOnExit()
47+
val tempDir = Utils.createTempDir()
4748
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
4849
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
4950
createJar(files, jarFile)

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

+4-6
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
2525
import scala.collection.JavaConversions._
2626
import scala.collection.mutable
2727
import scala.language.existentials
28-
import scala.reflect.ClassTag
29-
import scala.util.{Try, Success, Failure}
3028

3129
import net.razorvine.pickle.{Pickler, Unpickler}
3230

@@ -42,7 +40,7 @@ import org.apache.spark.rdd.RDD
4240
import org.apache.spark.util.Utils
4341

4442
private[spark] class PythonRDD(
45-
parent: RDD[_],
43+
@transient parent: RDD[_],
4644
command: Array[Byte],
4745
envVars: JMap[String, String],
4846
pythonIncludes: JList[String],
@@ -55,9 +53,9 @@ private[spark] class PythonRDD(
5553
val bufferSize = conf.getInt("spark.buffer.size", 65536)
5654
val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)
5755

58-
override def getPartitions = parent.partitions
56+
override def getPartitions = firstParent.partitions
5957

60-
override val partitioner = if (preservePartitoning) parent.partitioner else None
58+
override val partitioner = if (preservePartitoning) firstParent.partitioner else None
6159

6260
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
6361
val startTime = System.currentTimeMillis
@@ -234,7 +232,7 @@ private[spark] class PythonRDD(
234232
dataOut.writeInt(command.length)
235233
dataOut.write(command)
236234
// Data values
237-
PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
235+
PythonRDD.writeIteratorToStream(firstParent.iterator(split, context), dataOut)
238236
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
239237
dataOut.flush()
240238
} catch {

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
108108
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
109109

110110
// Create and start the worker
111-
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
111+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
112112
val workerEnv = pb.environment()
113113
workerEnv.putAll(envVars)
114114
workerEnv.put("PYTHONPATH", pythonPath)
115+
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
116+
workerEnv.put("PYTHONUNBUFFERED", "YES")
115117
val worker = pb.start()
116118

117119
// Redirect worker stdout and stderr
@@ -149,10 +151,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
149151

150152
try {
151153
// Create and start the daemon
152-
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
154+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
153155
val workerEnv = pb.environment()
154156
workerEnv.putAll(envVars)
155157
workerEnv.put("PYTHONPATH", pythonPath)
158+
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
159+
workerEnv.put("PYTHONUNBUFFERED", "YES")
156160
daemon = pb.start()
157161

158162
val in = new DataInputStream(daemon.getInputStream)

core/src/main/scala/org/apache/spark/deploy/Client.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
130130
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
131131
System.exit(-1)
132132

133-
case AssociationErrorEvent(cause, _, remoteAddress, _) =>
133+
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
134134
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
135135
println(s"Cause was: $cause")
136136
System.exit(-1)

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ object PythonRunner {
3434
val pythonFile = args(0)
3535
val pyFiles = args(1)
3636
val otherArgs = args.slice(2, args.length)
37-
val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
37+
val pythonExec =
38+
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))
3839

3940
// Format python file paths before adding them to the PYTHONPATH
4041
val formattedPythonFile = formatPath(pythonFile)
@@ -57,6 +58,7 @@ object PythonRunner {
5758
val builder = new ProcessBuilder(Seq(pythonExec, formattedPythonFile) ++ otherArgs)
5859
val env = builder.environment()
5960
env.put("PYTHONPATH", pythonPath)
61+
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
6062
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
6163
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
6264
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private[spark] class AppClient(
154154
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
155155
markDisconnected()
156156

157-
case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
157+
case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
158158
logWarning(s"Could not connect to $address: $cause")
159159

160160
case StopAppClient =>

core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
5454
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
5555
logInfo(s"Successfully connected to $workerUrl")
5656

57-
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
57+
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
5858
if isWorker(remoteAddress) =>
5959
// These logs may not be seen if the worker (and associated pipe) has died
6060
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")

core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala

+15-1
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,27 @@ sealed abstract class ManagedBuffer {
6666
final class FileSegmentManagedBuffer(val file: File, val offset: Long, val length: Long)
6767
extends ManagedBuffer {
6868

69+
/**
70+
* Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
71+
* Avoid unless there's a good reason not to.
72+
*/
73+
private val MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;
74+
6975
override def size: Long = length
7076

7177
override def nioByteBuffer(): ByteBuffer = {
7278
var channel: FileChannel = null
7379
try {
7480
channel = new RandomAccessFile(file, "r").getChannel
75-
channel.map(MapMode.READ_ONLY, offset, length)
81+
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
82+
if (length < MIN_MEMORY_MAP_BYTES) {
83+
val buf = ByteBuffer.allocate(length.toInt)
84+
channel.read(buf, offset)
85+
buf.flip()
86+
buf
87+
} else {
88+
channel.map(MapMode.READ_ONLY, offset, length)
89+
}
7690
} catch {
7791
case e: IOException =>
7892
Try(channel.size).toOption match {

core/src/main/scala/org/apache/spark/network/nio/Connection.scala

+21-14
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ package org.apache.spark.network.nio
2020
import java.net._
2121
import java.nio._
2222
import java.nio.channels._
23+
import java.util.concurrent.ConcurrentLinkedQueue
2324
import java.util.LinkedList
2425

2526
import org.apache.spark._
2627

28+
import scala.collection.JavaConversions._
2729
import scala.collection.mutable.{ArrayBuffer, HashMap}
30+
import scala.util.control.NonFatal
2831

2932
private[nio]
3033
abstract class Connection(val channel: SocketChannel, val selector: Selector,
@@ -51,7 +54,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
5154

5255
@volatile private var closed = false
5356
var onCloseCallback: Connection => Unit = null
54-
var onExceptionCallback: (Connection, Exception) => Unit = null
57+
val onExceptionCallbacks = new ConcurrentLinkedQueue[(Connection, Throwable) => Unit]
5558
var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
5659

5760
val remoteAddress = getRemoteAddress()
@@ -130,20 +133,24 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
130133
onCloseCallback = callback
131134
}
132135

133-
def onException(callback: (Connection, Exception) => Unit) {
134-
onExceptionCallback = callback
136+
def onException(callback: (Connection, Throwable) => Unit) {
137+
onExceptionCallbacks.add(callback)
135138
}
136139

137140
def onKeyInterestChange(callback: (Connection, Int) => Unit) {
138141
onKeyInterestChangeCallback = callback
139142
}
140143

141-
def callOnExceptionCallback(e: Exception) {
142-
if (onExceptionCallback != null) {
143-
onExceptionCallback(this, e)
144-
} else {
145-
logError("Error in connection to " + getRemoteConnectionManagerId() +
146-
" and OnExceptionCallback not registered", e)
144+
def callOnExceptionCallbacks(e: Throwable) {
145+
onExceptionCallbacks foreach {
146+
callback =>
147+
try {
148+
callback(this, e)
149+
} catch {
150+
case NonFatal(e) => {
151+
logWarning("Ignored error in onExceptionCallback", e)
152+
}
153+
}
147154
}
148155
}
149156

@@ -323,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
323330
} catch {
324331
case e: Exception => {
325332
logError("Error connecting to " + address, e)
326-
callOnExceptionCallback(e)
333+
callOnExceptionCallbacks(e)
327334
}
328335
}
329336
}
@@ -348,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
348355
} catch {
349356
case e: Exception => {
350357
logWarning("Error finishing connection to " + address, e)
351-
callOnExceptionCallback(e)
358+
callOnExceptionCallbacks(e)
352359
}
353360
}
354361
true
@@ -393,7 +400,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
393400
} catch {
394401
case e: Exception => {
395402
logWarning("Error writing in connection to " + getRemoteConnectionManagerId(), e)
396-
callOnExceptionCallback(e)
403+
callOnExceptionCallbacks(e)
397404
close()
398405
return false
399406
}
@@ -420,7 +427,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
420427
case e: Exception =>
421428
logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(),
422429
e)
423-
callOnExceptionCallback(e)
430+
callOnExceptionCallbacks(e)
424431
close()
425432
}
426433

@@ -577,7 +584,7 @@ private[spark] class ReceivingConnection(
577584
} catch {
578585
case e: Exception => {
579586
logWarning("Error reading from connection to " + getRemoteConnectionManagerId(), e)
580-
callOnExceptionCallback(e)
587+
callOnExceptionCallbacks(e)
581588
close()
582589
return false
583590
}

0 commit comments

Comments
 (0)