Skip to content

Commit e1c970d

Browse files
committed
merged master
2 parents abf2901 + 1fcd5dc commit e1c970d

File tree

249 files changed

+2742
-1013
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

249 files changed

+2742
-1013
lines changed

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
# Apache Spark
22

3-
Lightning-Fast Cluster Computing - <http://spark.apache.org/>
3+
Spark is a fast and general cluster computing system for Big Data. It provides
4+
high-level APIs in Scala, Java, and Python, and an optimized engine that
5+
supports general computation graphs for data analysis. It also supports a
6+
rich set of higher-level tools including Spark SQL for SQL and structured
7+
data processing, MLLib for machine learning, GraphX for graph processing,
8+
and Spark Streaming.
9+
10+
<http://spark.apache.org/>
411

512

613
## Online Documentation
@@ -81,7 +88,7 @@ versions without YARN, use:
8188
$ sbt/sbt -Dhadoop.version=2.0.0-mr1-cdh4.2.0 assembly
8289

8390
For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
84-
with YARN, also set `SPARK_YARN=true`:
91+
with YARN, also set `-Pyarn`:
8592

8693
# Apache Hadoop 2.0.5-alpha
8794
$ sbt/sbt -Dhadoop.version=2.0.5-alpha -Pyarn assembly

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@
114114
<groupId>org.xerial.snappy</groupId>
115115
<artifactId>snappy-java</artifactId>
116116
</dependency>
117+
<dependency>
118+
<groupId>net.jpountz.lz4</groupId>
119+
<artifactId>lz4</artifactId>
120+
</dependency>
117121
<dependency>
118122
<groupId>com.twitter</groupId>
119123
<artifactId>chill_${scala.binary.version}</artifactId>

core/src/main/scala/org/apache/spark/Aggregator.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ case class Aggregator[K, V, C] (
5656
} else {
5757
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
5858
while (iter.hasNext) {
59-
val (k, v) = iter.next()
60-
combiners.insert(k, v)
59+
val pair = iter.next()
60+
combiners.insert(pair._1, pair._2)
6161
}
6262
// TODO: Make this non optional in a future release
6363
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
@@ -85,8 +85,8 @@ case class Aggregator[K, V, C] (
8585
} else {
8686
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
8787
while (iter.hasNext) {
88-
val (k, c) = iter.next()
89-
combiners.insert(k, c)
88+
val pair = iter.next()
89+
combiners.insert(pair._1, pair._2)
9090
}
9191
// TODO: Make this non optional in a future release
9292
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
15311531
throw new SparkException("YARN mode not available ?", e)
15321532
}
15331533
}
1534-
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
1534+
val backend = try {
1535+
val clazz =
1536+
Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
1537+
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
1538+
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
1539+
} catch {
1540+
case e: Exception => {
1541+
throw new SparkException("YARN mode not available ?", e)
1542+
}
1543+
}
15351544
scheduler.initialize(backend)
15361545
scheduler
15371546

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,9 @@ case class ExceptionFailure(
8989
metrics: Option[TaskMetrics])
9090
extends TaskFailedReason {
9191
override def toErrorString: String = {
92-
val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n")
93-
s"$className ($description}\n$stackTraceString"
92+
val stackTraceString =
93+
if (stackTrace == null) "null" else stackTrace.map(" " + _).mkString("\n")
94+
s"$className ($description)\n$stackTraceString"
9495
}
9596
}
9697

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ private[spark] object TestUtils {
9292
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
9393
val compiler = ToolProvider.getSystemJavaCompiler
9494
val sourceFile = new JavaSourceFromString(className,
95-
"public class " + className + " { @Override public String toString() { " +
96-
"return \"" + value + "\";}}")
95+
"public class " + className + " implements java.io.Serializable {" +
96+
" @Override public String toString() { return \"" + value + "\"; }}")
9797

9898
// Calling this outputs a class file in pwd. It's easier to just rename the file than
9999
// build a custom FileManager that controls the output location.

core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,23 +106,23 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
106106
* Actually get the broadcasted value. Concrete implementations of Broadcast class must
107107
* define their own way to get the value.
108108
*/
109-
private[spark] def getValue(): T
109+
protected def getValue(): T
110110

111111
/**
112112
* Actually unpersist the broadcasted value on the executors. Concrete implementations of
113113
* Broadcast class must define their own logic to unpersist their own data.
114114
*/
115-
private[spark] def doUnpersist(blocking: Boolean)
115+
protected def doUnpersist(blocking: Boolean)
116116

117117
/**
118118
* Actually destroy all data and metadata related to this broadcast variable.
119119
* Implementation of Broadcast class must define their own logic to destroy their own
120120
* state.
121121
*/
122-
private[spark] def doDestroy(blocking: Boolean)
122+
protected def doDestroy(blocking: Boolean)
123123

124124
/** Check if this broadcast is valid. If not valid, exception is thrown. */
125-
private[spark] def assertValid() {
125+
protected def assertValid() {
126126
if (!_isValid) {
127127
throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
128128
}

core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private[spark] class BroadcastManager(
3939
synchronized {
4040
if (!initialized) {
4141
val broadcastFactoryClass =
42-
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
42+
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
4343

4444
broadcastFactory =
4545
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ private[spark] class HttpBroadcast[T: ClassTag](
4040
@transient var value_ : T, isLocal: Boolean, id: Long)
4141
extends Broadcast[T](id) with Logging with Serializable {
4242

43-
def getValue = value_
43+
override protected def getValue() = value_
4444

45-
val blockId = BroadcastBlockId(id)
45+
private val blockId = BroadcastBlockId(id)
4646

4747
/*
4848
* Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
@@ -60,14 +60,14 @@ private[spark] class HttpBroadcast[T: ClassTag](
6060
/**
6161
* Remove all persisted state associated with this HTTP broadcast on the executors.
6262
*/
63-
def doUnpersist(blocking: Boolean) {
63+
override protected def doUnpersist(blocking: Boolean) {
6464
HttpBroadcast.unpersist(id, removeFromDriver = false, blocking)
6565
}
6666

6767
/**
6868
* Remove all persisted state associated with this HTTP broadcast on the executors and driver.
6969
*/
70-
def doDestroy(blocking: Boolean) {
70+
override protected def doDestroy(blocking: Boolean) {
7171
HttpBroadcast.unpersist(id, removeFromDriver = true, blocking)
7272
}
7373

@@ -102,7 +102,7 @@ private[spark] class HttpBroadcast[T: ClassTag](
102102
}
103103
}
104104

105-
private[spark] object HttpBroadcast extends Logging {
105+
private[broadcast] object HttpBroadcast extends Logging {
106106
private var initialized = false
107107
private var broadcastDir: File = null
108108
private var compress: Boolean = false
@@ -160,7 +160,7 @@ private[spark] object HttpBroadcast extends Logging {
160160

161161
def getFile(id: Long) = new File(broadcastDir, BroadcastBlockId(id).name)
162162

163-
def write(id: Long, value: Any) {
163+
private def write(id: Long, value: Any) {
164164
val file = getFile(id)
165165
val out: OutputStream = {
166166
if (compress) {
@@ -176,7 +176,7 @@ private[spark] object HttpBroadcast extends Logging {
176176
files += file
177177
}
178178

179-
def read[T: ClassTag](id: Long): T = {
179+
private def read[T: ClassTag](id: Long): T = {
180180
logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
181181
val url = serverUri + "/" + BroadcastBlockId(id).name
182182

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
2727
* [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism.
2828
*/
2929
class HttpBroadcastFactory extends BroadcastFactory {
30-
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
30+
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
3131
HttpBroadcast.initialize(isDriver, conf, securityMgr)
3232
}
3333

34-
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
34+
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
3535
new HttpBroadcast[T](value_, isLocal, id)
3636

37-
def stop() { HttpBroadcast.stop() }
37+
override def stop() { HttpBroadcast.stop() }
3838

3939
/**
4040
* Remove all persisted state associated with the HTTP broadcast with the given ID.
4141
* @param removeFromDriver Whether to remove state from the driver
4242
* @param blocking Whether to block until unbroadcasted
4343
*/
44-
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
44+
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
4545
HttpBroadcast.unpersist(id, removeFromDriver, blocking)
4646
}
4747
}

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.broadcast
2020
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
2121

2222
import scala.reflect.ClassTag
23-
import scala.math
2423
import scala.util.Random
2524

2625
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
@@ -49,19 +48,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](
4948
@transient var value_ : T, isLocal: Boolean, id: Long)
5049
extends Broadcast[T](id) with Logging with Serializable {
5150

52-
def getValue = value_
51+
override protected def getValue() = value_
5352

54-
val broadcastId = BroadcastBlockId(id)
53+
private val broadcastId = BroadcastBlockId(id)
5554

5655
TorrentBroadcast.synchronized {
5756
SparkEnv.get.blockManager.putSingle(
5857
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
5958
}
6059

61-
@transient var arrayOfBlocks: Array[TorrentBlock] = null
62-
@transient var totalBlocks = -1
63-
@transient var totalBytes = -1
64-
@transient var hasBlocks = 0
60+
@transient private var arrayOfBlocks: Array[TorrentBlock] = null
61+
@transient private var totalBlocks = -1
62+
@transient private var totalBytes = -1
63+
@transient private var hasBlocks = 0
6564

6665
if (!isLocal) {
6766
sendBroadcast()
@@ -70,19 +69,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](
7069
/**
7170
* Remove all persisted state associated with this Torrent broadcast on the executors.
7271
*/
73-
def doUnpersist(blocking: Boolean) {
72+
override protected def doUnpersist(blocking: Boolean) {
7473
TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
7574
}
7675

7776
/**
7877
* Remove all persisted state associated with this Torrent broadcast on the executors
7978
* and driver.
8079
*/
81-
def doDestroy(blocking: Boolean) {
80+
override protected def doDestroy(blocking: Boolean) {
8281
TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
8382
}
8483

85-
def sendBroadcast() {
84+
private def sendBroadcast() {
8685
val tInfo = TorrentBroadcast.blockifyObject(value_)
8786
totalBlocks = tInfo.totalBlocks
8887
totalBytes = tInfo.totalBytes
@@ -159,7 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
159158
hasBlocks = 0
160159
}
161160

162-
def receiveBroadcast(): Boolean = {
161+
private def receiveBroadcast(): Boolean = {
163162
// Receive meta-info about the size of broadcast data,
164163
// the number of chunks it is divided into, etc.
165164
val metaId = BroadcastBlockId(id, "meta")
@@ -211,7 +210,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
211210

212211
}
213212

214-
private[spark] object TorrentBroadcast extends Logging {
213+
private[broadcast] object TorrentBroadcast extends Logging {
215214
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
216215
private var initialized = false
217216
private var conf: SparkConf = null
@@ -272,17 +271,19 @@ private[spark] object TorrentBroadcast extends Logging {
272271
* Remove all persisted blocks associated with this torrent broadcast on the executors.
273272
* If removeFromDriver is true, also remove these persisted blocks on the driver.
274273
*/
275-
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
276-
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
274+
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
275+
synchronized {
276+
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
277+
}
277278
}
278279
}
279280

280-
private[spark] case class TorrentBlock(
281+
private[broadcast] case class TorrentBlock(
281282
blockID: Int,
282283
byteArray: Array[Byte])
283284
extends Serializable
284285

285-
private[spark] case class TorrentInfo(
286+
private[broadcast] case class TorrentInfo(
286287
@transient arrayOfBlocks: Array[TorrentBlock],
287288
totalBlocks: Int,
288289
totalBytes: Int)

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
2828
*/
2929
class TorrentBroadcastFactory extends BroadcastFactory {
3030

31-
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
31+
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
3232
TorrentBroadcast.initialize(isDriver, conf)
3333
}
3434

35-
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
35+
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
3636
new TorrentBroadcast[T](value_, isLocal, id)
3737

38-
def stop() { TorrentBroadcast.stop() }
38+
override def stop() { TorrentBroadcast.stop() }
3939

4040
/**
4141
* Remove all persisted state associated with the torrent broadcast with the given ID.
4242
* @param removeFromDriver Whether to remove state from the driver.
4343
* @param blocking Whether to block until unbroadcasted
4444
*/
45-
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
45+
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
4646
TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
4747
}
4848
}

0 commit comments

Comments
 (0)