Skip to content

Commit ffaa83d

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
2 parents 12d3de8 + 75db174 commit ffaa83d

File tree

197 files changed

+1204
-346
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

197 files changed

+1204
-346
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ conf/spark-env.sh
1919
conf/streaming-env.sh
2020
conf/log4j.properties
2121
conf/spark-defaults.conf
22+
conf/hive-site.xml
2223
docs/_site
2324
docs/api
2425
target/

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
134134
def getPartition(key: Any): Int = {
135135
val k = key.asInstanceOf[K]
136136
var partition = 0
137-
if (rangeBounds.length < 1000) {
138-
// If we have less than 100 partitions naive search
137+
if (rangeBounds.length <= 128) {
138+
// If we have less than 128 partitions naive search
139139
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
140140
partition += 1
141141
}

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ class TaskMetrics extends Serializable {
9999
existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
100100
existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
101101
existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
102-
existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched
103102
existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
104103
case None =>
105104
_shuffleReadMetrics = Some(newMetrics)
@@ -149,7 +148,7 @@ class ShuffleReadMetrics extends Serializable {
149148
/**
150149
* Number of blocks fetched in this shuffle by this task (remote or local)
151150
*/
152-
var totalBlocksFetched: Int = _
151+
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
153152

154153
/**
155154
* Number of remote blocks fetched in this shuffle by this task

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,11 +1269,55 @@ abstract class RDD[T: ClassTag](
12691269

12701270
/** A description of this RDD and its recursive dependencies for debugging. */
12711271
def toDebugString: String = {
1272-
def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
1273-
Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++
1274-
rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " "))
1272+
// Apply a different rule to the last child
1273+
def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = {
1274+
val len = rdd.dependencies.length
1275+
len match {
1276+
case 0 => Seq.empty
1277+
case 1 =>
1278+
val d = rdd.dependencies.head
1279+
debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]], true)
1280+
case _ =>
1281+
val frontDeps = rdd.dependencies.take(len - 1)
1282+
val frontDepStrings = frontDeps.flatMap(
1283+
d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]]))
1284+
1285+
val lastDep = rdd.dependencies.last
1286+
val lastDepStrings =
1287+
debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_,_,_]], true)
1288+
1289+
(frontDepStrings ++ lastDepStrings)
1290+
}
1291+
}
1292+
// The first RDD in the dependency stack has no parents, so no need for a +-
1293+
def firstDebugString(rdd: RDD[_]): Seq[String] = {
1294+
val partitionStr = "(" + rdd.partitions.size + ")"
1295+
val leftOffset = (partitionStr.length - 1) / 2
1296+
val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))
1297+
Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
1298+
}
1299+
def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
1300+
val partitionStr = "(" + rdd.partitions.size + ")"
1301+
val leftOffset = (partitionStr.length - 1) / 2
1302+
val thisPrefix = prefix.replaceAll("\\|\\s+$", "")
1303+
val nextPrefix = (
1304+
thisPrefix
1305+
+ (if (isLastChild) " " else "| ")
1306+
+ (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)))
1307+
Seq(thisPrefix + "+-" + partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
1308+
}
1309+
def debugString(rdd: RDD[_],
1310+
prefix: String = "",
1311+
isShuffle: Boolean = true,
1312+
isLastChild: Boolean = false): Seq[String] = {
1313+
if (isShuffle) {
1314+
shuffleDebugString(rdd, prefix, isLastChild)
1315+
}
1316+
else {
1317+
Seq(prefix + rdd) ++ debugChildren(rdd, prefix)
1318+
}
12751319
}
1276-
debugString(this).mkString("\n")
1320+
firstDebugString(this).mkString("\n")
12771321
}
12781322

12791323
override def toString: String = "%s%s[%d] at %s".format(

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
8181
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
8282
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
8383
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
84-
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
8584
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
8685
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
8786
context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)

core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ import org.apache.spark.util.Utils
4646
private[storage]
4747
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
4848
def initialize()
49-
def totalBlocks: Int
5049
def numLocalBlocks: Int
5150
def numRemoteBlocks: Int
5251
def fetchWaitTime: Long
@@ -199,7 +198,7 @@ object BlockFetcherIterator {
199198
}
200199
}
201200
logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
202-
totalBlocks + " blocks")
201+
(numLocal + numRemote) + " blocks")
203202
remoteRequests
204203
}
205204

@@ -242,7 +241,6 @@ object BlockFetcherIterator {
242241
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
243242
}
244243

245-
override def totalBlocks: Int = numLocal + numRemote
246244
override def numLocalBlocks: Int = numLocal
247245
override def numRemoteBlocks: Int = numRemote
248246
override def fetchWaitTime: Long = _fetchWaitTime

core/src/main/scala/org/apache/spark/ui/ToolTips.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package org.apache.spark.ui
2020
private[spark] object ToolTips {
2121
val SCHEDULER_DELAY =
2222
"""Scheduler delay includes time to ship the task from the scheduler to
23-
the executor, and time the time to send a message from the executor to the scheduler stating
24-
that the task has completed. When the scheduler becomes overloaded, task completion messages
25-
become queued up, and scheduler delay increases."""
23+
the executor, and time to send the task result from the executor to the scheduler. If
24+
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
25+
of task results."""
2626

2727
val INPUT = "Bytes read from Hadoop or from Spark storage."
2828

core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala renamed to core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ package org.apache.spark.util
1919

2020
import java.util
2121

22-
import scala.Array
23-
import scala.reflect._
22+
import scala.reflect.{classTag, ClassTag}
2423

2524
private[spark] object CollectionsUtils {
2625
def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = {
26+
// For primitive keys, we can use the natural ordering. Otherwise, use the Ordering comparator.
2727
classTag[K] match {
2828
case ClassTag.Float =>
2929
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
@@ -40,7 +40,8 @@ private[spark] object CollectionsUtils {
4040
case ClassTag.Long =>
4141
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long])
4242
case _ =>
43-
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x)
43+
val comparator = implicitly[Ordering[K]].asInstanceOf[java.util.Comparator[Any]]
44+
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x, comparator)
4445
}
4546
}
4647
}

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,6 @@ private[spark] object JsonProtocol {
237237

238238
def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
239239
("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
240-
("Total Blocks Fetched" -> shuffleReadMetrics.totalBlocksFetched) ~
241240
("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
242241
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
243242
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
@@ -548,7 +547,6 @@ private[spark] object JsonProtocol {
548547
def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
549548
val metrics = new ShuffleReadMetrics
550549
metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
551-
metrics.totalBlocksFetched = (json \ "Total Blocks Fetched").extract[Int]
552550
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
553551
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
554552
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,17 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
9191
}
9292
}
9393

94+
test("RangePartitioner for keys that are not Comparable (but with Ordering)") {
95+
// Row does not extend Comparable, but has an implicit Ordering defined.
96+
implicit object RowOrdering extends Ordering[Row] {
97+
override def compare(x: Row, y: Row) = x.value - y.value
98+
}
99+
100+
val rdd = sc.parallelize(1 to 4500).map(x => (Row(x), Row(x)))
101+
val partitioner = new RangePartitioner(1500, rdd)
102+
partitioner.getPartition(Row(100))
103+
}
104+
94105
test("HashPartitioner not equal to RangePartitioner") {
95106
val rdd = sc.parallelize(1 to 10).map(x => (x, x))
96107
val rangeP2 = new RangePartitioner(2, rdd)
@@ -177,3 +188,6 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
177188
// Add other tests here for classes that should be able to handle empty partitions correctly
178189
}
179190
}
191+
192+
193+
private sealed case class Row(value: Int)

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,6 @@ class JsonProtocolSuite extends FunSuite {
314314

315315
private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) {
316316
assert(metrics1.shuffleFinishTime === metrics2.shuffleFinishTime)
317-
assert(metrics1.totalBlocksFetched === metrics2.totalBlocksFetched)
318317
assert(metrics1.remoteBlocksFetched === metrics2.remoteBlocksFetched)
319318
assert(metrics1.localBlocksFetched === metrics2.localBlocksFetched)
320319
assert(metrics1.fetchWaitTime === metrics2.fetchWaitTime)
@@ -513,7 +512,6 @@ class JsonProtocolSuite extends FunSuite {
513512
} else {
514513
val sr = new ShuffleReadMetrics
515514
sr.shuffleFinishTime = b + c
516-
sr.totalBlocksFetched = e + f
517515
sr.remoteBytesRead = b + d
518516
sr.localBlocksFetched = e
519517
sr.fetchWaitTime = a + d
@@ -584,7 +582,6 @@ class JsonProtocolSuite extends FunSuite {
584582
| "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
585583
| "Shuffle Read Metrics":{
586584
| "Shuffle Finish Time":900,
587-
| "Total Blocks Fetched":1500,
588585
| "Remote Blocks Fetched":800,
589586
| "Local Blocks Fetched":700,
590587
| "Fetch Wait Time":900,

docs/mllib-clustering.md

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,54 @@ println("Within Set Sum of Squared Errors = " + WSSSE)
6969
All of MLlib's methods use Java-friendly types, so you can import and call them there the same
7070
way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the
7171
Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by
72-
calling `.rdd()` on your `JavaRDD` object.
72+
calling `.rdd()` on your `JavaRDD` object. A standalone application example
73+
that is equivalent to the provided example in Scala is given bellow:
74+
75+
{% highlight java %}
76+
import org.apache.spark.api.java.*;
77+
import org.apache.spark.api.java.function.Function;
78+
import org.apache.spark.mllib.clustering.KMeans;
79+
import org.apache.spark.mllib.clustering.KMeansModel;
80+
import org.apache.spark.mllib.linalg.Vector;
81+
import org.apache.spark.mllib.linalg.Vectors;
82+
import org.apache.spark.SparkConf;
83+
84+
public class KMeansExample {
85+
public static void main(String[] args) {
86+
SparkConf conf = new SparkConf().setAppName("K-means Example");
87+
JavaSparkContext sc = new JavaSparkContext(conf);
88+
89+
// Load and parse data
90+
String path = "data/mllib/kmeans_data.txt";
91+
JavaRDD<String> data = sc.textFile(path);
92+
JavaRDD<Vector> parsedData = data.map(
93+
new Function<String, Vector>() {
94+
public Vector call(String s) {
95+
String[] sarray = s.split(" ");
96+
double[] values = new double[sarray.length];
97+
for (int i = 0; i < sarray.length; i++)
98+
values[i] = Double.parseDouble(sarray[i]);
99+
return Vectors.dense(values);
100+
}
101+
}
102+
);
103+
104+
// Cluster the data into two classes using KMeans
105+
int numClusters = 2;
106+
int numIterations = 20;
107+
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
108+
109+
// Evaluate clustering by computing Within Set Sum of Squared Errors
110+
double WSSSE = clusters.computeCost(parsedData.rdd());
111+
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
112+
}
113+
}
114+
{% endhighlight %}
115+
116+
In order to run the above standalone application using Spark framework make
117+
sure that you follow the instructions provided at section [Standalone
118+
Applications](quick-start.html) of the quick-start guide. What is more, you
119+
should include to your build file *spark-mllib* as a dependency.
73120
</div>
74121

75122
<div data-lang="python" markdown="1">

docs/mllib-collaborative-filtering.md

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,85 @@ val model = ALS.trainImplicit(ratings, rank, numIterations, alpha)
9999
All of MLlib's methods use Java-friendly types, so you can import and call them there the same
100100
way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the
101101
Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by
102-
calling `.rdd()` on your `JavaRDD` object.
102+
calling `.rdd()` on your `JavaRDD` object. A standalone application example
103+
that is equivalent to the provided example in Scala is given bellow:
104+
105+
{% highlight java %}
106+
import scala.Tuple2;
107+
108+
import org.apache.spark.api.java.*;
109+
import org.apache.spark.api.java.function.Function;
110+
import org.apache.spark.mllib.recommendation.ALS;
111+
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
112+
import org.apache.spark.mllib.recommendation.Rating;
113+
import org.apache.spark.SparkConf;
114+
115+
public class CollaborativeFiltering {
116+
public static void main(String[] args) {
117+
SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example");
118+
JavaSparkContext sc = new JavaSparkContext(conf);
119+
120+
// Load and parse the data
121+
String path = "data/mllib/als/test.data";
122+
JavaRDD<String> data = sc.textFile(path);
123+
JavaRDD<Rating> ratings = data.map(
124+
new Function<String, Rating>() {
125+
public Rating call(String s) {
126+
String[] sarray = s.split(",");
127+
return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),
128+
Double.parseDouble(sarray[2]));
129+
}
130+
}
131+
);
132+
133+
// Build the recommendation model using ALS
134+
int rank = 10;
135+
int numIterations = 20;
136+
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
137+
138+
// Evaluate the model on rating data
139+
JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
140+
new Function<Rating, Tuple2<Object, Object>>() {
141+
public Tuple2<Object, Object> call(Rating r) {
142+
return new Tuple2<Object, Object>(r.user(), r.product());
143+
}
144+
}
145+
);
146+
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
147+
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
148+
new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
149+
public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
150+
return new Tuple2<Tuple2<Integer, Integer>, Double>(
151+
new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
152+
}
153+
}
154+
));
155+
JavaRDD<Tuple2<Double, Double>> ratesAndPreds =
156+
JavaPairRDD.fromJavaRDD(ratings.map(
157+
new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
158+
public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
159+
return new Tuple2<Tuple2<Integer, Integer>, Double>(
160+
new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
161+
}
162+
}
163+
)).join(predictions).values();
164+
double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
165+
new Function<Tuple2<Double, Double>, Object>() {
166+
public Object call(Tuple2<Double, Double> pair) {
167+
Double err = pair._1() - pair._2();
168+
return err * err;
169+
}
170+
}
171+
).rdd()).mean();
172+
System.out.println("Mean Squared Error = " + MSE);
173+
}
174+
}
175+
{% endhighlight %}
176+
177+
In order to run the above standalone application using Spark framework make
178+
sure that you follow the instructions provided at section [Standalone
179+
Applications](quick-start.html) of the quick-start guide. What is more, you
180+
should include to your build file *spark-mllib* as a dependency.
103181
</div>
104182

105183
<div data-lang="python" markdown="1">

0 commit comments

Comments
 (0)