Skip to content

Commit c7265dc

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-4949
2 parents 42ca528 + e79a7a6 commit c7265dc

File tree

48 files changed

+527
-319
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+527
-319
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,13 @@ class DAGScheduler(
9898

9999
private[scheduler] val activeJobs = new HashSet[ActiveJob]
100100

101-
// Contains the locations that each RDD's partitions are cached on
101+
/**
102+
* Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids
103+
* and its values are arrays indexed by partition numbers. Each array value is the set of
104+
* locations where that RDD partition is cached.
105+
*
106+
* All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
107+
*/
102108
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
103109

104110
// For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
@@ -183,7 +189,8 @@ class DAGScheduler(
183189
eventProcessLoop.post(TaskSetFailed(taskSet, reason))
184190
}
185191

186-
private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
192+
private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = cacheLocs.synchronized {
193+
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
187194
if (!cacheLocs.contains(rdd.id)) {
188195
val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
189196
val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster)
@@ -194,7 +201,7 @@ class DAGScheduler(
194201
cacheLocs(rdd.id)
195202
}
196203

197-
private def clearCacheLocs() {
204+
private def clearCacheLocs(): Unit = cacheLocs.synchronized {
198205
cacheLocs.clear()
199206
}
200207

@@ -1276,17 +1283,26 @@ class DAGScheduler(
12761283
}
12771284

12781285
/**
1279-
* Synchronized method that might be called from other threads.
1286+
* Gets the locality information associated with a partition of a particular RDD.
1287+
*
1288+
* This method is thread-safe and is called from both DAGScheduler and SparkContext.
1289+
*
12801290
* @param rdd whose partitions are to be looked at
12811291
* @param partition to lookup locality information for
12821292
* @return list of machines that are preferred by the partition
12831293
*/
12841294
private[spark]
1285-
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
1295+
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
12861296
getPreferredLocsInternal(rdd, partition, new HashSet)
12871297
}
12881298

1289-
/** Recursive implementation for getPreferredLocs. */
1299+
/**
1300+
* Recursive implementation for getPreferredLocs.
1301+
*
1302+
* This method is thread-safe because it only accesses DAGScheduler state through thread-safe
1303+
* methods (getCacheLocs()); please be careful when modifying this method, because any new
1304+
* DAGScheduler state accessed by it may require additional synchronization.
1305+
*/
12901306
private def getPreferredLocsInternal(
12911307
rdd: RDD[_],
12921308
partition: Int,

docs/mllib-ensembles.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ val (trainingData, testData) = (splits(0), splits(1))
458458
// The defaultParams for Classification use LogLoss by default.
459459
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
460460
boostingStrategy.numIterations = 3 // Note: Use more iterations in practice.
461-
boostingStrategy.treeStrategy.numClassesForClassification = 2
461+
boostingStrategy.treeStrategy.numClasses = 2
462462
boostingStrategy.treeStrategy.maxDepth = 5
463463
// Empty categoricalFeaturesInfo indicates all features are continuous.
464464
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()

examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ object CrossValidatorExample {
9090
crossval.setNumFolds(2) // Use 3+ in practice
9191

9292
// Run cross-validation, and choose the best set of parameters.
93-
val cvModel = crossval.fit(training.toDF)
93+
val cvModel = crossval.fit(training.toDF())
9494

9595
// Prepare test documents, which are unlabeled.
9696
val test = sc.parallelize(Seq(

examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ object DeveloperApiExample {
5858
lr.setMaxIter(10)
5959

6060
// Learn a LogisticRegression model. This uses the parameters stored in lr.
61-
val model = lr.fit(training.toDF)
61+
val model = lr.fit(training.toDF())
6262

6363
// Prepare test data.
6464
val test = sc.parallelize(Seq(
@@ -67,7 +67,7 @@ object DeveloperApiExample {
6767
LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))
6868

6969
// Make predictions on test data.
70-
val sumPredictions: Double = model.transform(test.toDF)
70+
val sumPredictions: Double = model.transform(test.toDF())
7171
.select("features", "label", "prediction")
7272
.collect()
7373
.map { case Row(features: Vector, label: Double, prediction: Double) =>

examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ object MovieLensALS {
137137
.setRegParam(params.regParam)
138138
.setNumBlocks(params.numBlocks)
139139

140-
val model = als.fit(training.toDF)
140+
val model = als.fit(training.toDF())
141141

142-
val predictions = model.transform(test.toDF).cache()
142+
val predictions = model.transform(test.toDF()).cache()
143143

144144
// Evaluate the model.
145145
// TODO: Create an evaluator to compute RMSE.
@@ -158,7 +158,7 @@ object MovieLensALS {
158158

159159
// Inspect false positives.
160160
predictions.registerTempTable("prediction")
161-
sc.textFile(params.movies).map(Movie.parseMovie).toDF.registerTempTable("movie")
161+
sc.textFile(params.movies).map(Movie.parseMovie).toDF().registerTempTable("movie")
162162
sqlContext.sql(
163163
"""
164164
|SELECT userId, prediction.movieId, title, rating, prediction

examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ object SimpleParamsExample {
5858
.setRegParam(0.01)
5959

6060
// Learn a LogisticRegression model. This uses the parameters stored in lr.
61-
val model1 = lr.fit(training.toDF)
61+
val model1 = lr.fit(training.toDF())
6262
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
6363
// we can view the parameters it used during fit().
6464
// This prints the parameter (name: value) pairs, where names are unique IDs for this
@@ -77,7 +77,7 @@ object SimpleParamsExample {
7777

7878
// Now learn a new model using the paramMapCombined parameters.
7979
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
80-
val model2 = lr.fit(training.toDF, paramMapCombined)
80+
val model2 = lr.fit(training.toDF(), paramMapCombined)
8181
println("Model 2 was fit using parameters: " + model2.fittingParamMap)
8282

8383
// Prepare test data.
@@ -90,7 +90,7 @@ object SimpleParamsExample {
9090
// LogisticRegression.transform will only use the 'features' column.
9191
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
9292
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
93-
model2.transform(test.toDF)
93+
model2.transform(test.toDF())
9494
.select("features", "label", "myProbability", "prediction")
9595
.collect()
9696
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>

examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ object SimpleTextClassificationPipeline {
6969
.setStages(Array(tokenizer, hashingTF, lr))
7070

7171
// Fit the pipeline to training documents.
72-
val model = pipeline.fit(training.toDF)
72+
val model = pipeline.fit(training.toDF())
7373

7474
// Prepare test documents, which are unlabeled.
7575
val test = sc.parallelize(Seq(
@@ -79,7 +79,7 @@ object SimpleTextClassificationPipeline {
7979
Document(7L, "apache hadoop")))
8080

8181
// Make predictions on test documents.
82-
model.transform(test.toDF)
82+
model.transform(test.toDF())
8383
.select("id", "text", "probability", "prediction")
8484
.collect()
8585
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>

examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ object DatasetExample {
8181
println(s"Loaded ${origData.count()} instances from file: ${params.input}")
8282

8383
// Convert input data to DataFrame explicitly.
84-
val df: DataFrame = origData.toDF
84+
val df: DataFrame = origData.toDF()
8585
println(s"Inferred schema:\n${df.schema.prettyJson}")
8686
println(s"Converted to DataFrame with ${df.count()} records")
8787

examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ object RDDRelation {
3434
// Importing the SQL context gives access to all the SQL functions and implicit conversions.
3535
import sqlContext.implicits._
3636

37-
val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF
37+
val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF()
3838
// Any RDD containing case classes can be registered as a table. The schema of the table is
3939
// automatically inferred using scala reflection.
4040
df.registerTempTable("records")

examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ object HiveFromSpark {
6868

6969
// You can also register RDDs as temporary tables within a HiveContext.
7070
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
71-
rdd.toDF.registerTempTable("records")
71+
rdd.toDF().registerTempTable("records")
7272

7373
// Queries can then join RDD data with data stored in Hive.
7474
println("Result of SELECT *:")

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka
2020
import java.io.File
2121

2222
import scala.collection.mutable
23+
import scala.collection.mutable.ArrayBuffer
2324
import scala.concurrent.duration._
2425
import scala.language.postfixOps
2526

27+
import kafka.common.TopicAndPartition
28+
import kafka.message.MessageAndMetadata
2629
import kafka.serializer.StringDecoder
2730
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
28-
import org.scalatest.concurrent.{Eventually, Timeouts}
31+
import org.scalatest.concurrent.Eventually
2932

30-
import org.apache.spark.{SparkContext, SparkConf}
33+
import org.apache.spark.{SparkConf, SparkContext}
3134
import org.apache.spark.rdd.RDD
3235
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
33-
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
36+
import org.apache.spark.streaming.dstream.DStream
3437
import org.apache.spark.util.Utils
35-
import kafka.common.TopicAndPartition
36-
import kafka.message.MessageAndMetadata
3738

3839
class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
3940
with BeforeAndAfter with BeforeAndAfterAll with Eventually {
@@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
6768
}
6869

6970

70-
ignore("basic stream receiving with multiple topics and smallest starting offset") {
71+
test("basic stream receiving with multiple topics and smallest starting offset") {
7172
val topics = Set("basic1", "basic2", "basic3")
7273
val data = Map("a" -> 7, "b" -> 9)
7374
topics.foreach { t =>
7475
createTopic(t)
7576
sendMessages(t, data)
7677
}
78+
val totalSent = data.values.sum * topics.size
7779
val kafkaParams = Map(
7880
"metadata.broker.list" -> s"$brokerAddress",
7981
"auto.offset.reset" -> "smallest"
@@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
8486
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
8587
ssc, kafkaParams, topics)
8688
}
87-
var total = 0L
89+
90+
val allReceived = new ArrayBuffer[(String, String)]
8891

8992
stream.foreachRDD { rdd =>
9093
// Get the offset ranges in the RDD
@@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
104107
collected.foreach { case (partSize, rangeSize) =>
105108
assert(partSize === rangeSize, "offset ranges are wrong")
106109
}
107-
total += collected.size // Add up all the collected items
108110
}
111+
stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
109112
ssc.start()
110113
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
111-
assert(total === data.values.sum * topics.size, "didn't get all messages")
114+
assert(allReceived.size === totalSent,
115+
"didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
112116
}
113117
ssc.stop()
114118
}
115119

116-
ignore("receiving from largest starting offset") {
120+
test("receiving from largest starting offset") {
117121
val topic = "largest"
118122
val topicPartition = TopicAndPartition(topic, 0)
119123
val data = Map("a" -> 10)
@@ -158,7 +162,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
158162
}
159163

160164

161-
ignore("creating stream by offset") {
165+
test("creating stream by offset") {
162166
val topic = "offset"
163167
val topicPartition = TopicAndPartition(topic, 0)
164168
val data = Map("a" -> 10)
@@ -204,7 +208,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
204208
}
205209

206210
// Test to verify the offset ranges can be recovered from the checkpoints
207-
ignore("offset recovery") {
211+
test("offset recovery") {
208212
val topic = "recovery"
209213
createTopic(topic)
210214
testDir = Utils.createTempDir()

mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
102102
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
103103

104104
// Create Parquet data.
105-
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF
105+
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
106106
dataRDD.saveAsParquetFile(dataPath(path))
107107
}
108108

mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel {
6262

6363
// Create Parquet data.
6464
val data = Data(weights, intercept, threshold)
65-
sc.parallelize(Seq(data), 1).toDF.saveAsParquetFile(Loader.dataPath(path))
65+
sc.parallelize(Seq(data), 1).toDF().saveAsParquetFile(Loader.dataPath(path))
6666
}
6767

6868
/**

mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[regression] object GLMRegressionModel {
5858

5959
// Create Parquet data.
6060
val data = Data(weights, intercept)
61-
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF
61+
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
6262
// TODO: repartition with 1 partition after SPARK-5532 gets fixed
6363
dataRDD.saveAsParquetFile(Loader.dataPath(path))
6464
}

mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] {
197197
val nodes = model.topNode.subtreeIterator.toSeq
198198
val dataRDD: DataFrame = sc.parallelize(nodes)
199199
.map(NodeData.apply(0, _))
200-
.toDF
200+
.toDF()
201201
dataRDD.saveAsParquetFile(Loader.dataPath(path))
202202
}
203203

mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ private[tree] object TreeEnsembleModel {
289289
// Create Parquet data.
290290
val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case (tree, treeId) =>
291291
tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node))
292-
}.toDF
292+
}.toDF()
293293
dataRDD.saveAsParquetFile(Loader.dataPath(path))
294294
}
295295

mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,8 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging {
358358
.setNumUserBlocks(numUserBlocks)
359359
.setNumItemBlocks(numItemBlocks)
360360
val alpha = als.getAlpha
361-
val model = als.fit(training.toDF)
362-
val predictions = model.transform(test.toDF)
361+
val model = als.fit(training.toDF())
362+
val predictions = model.transform(test.toDF())
363363
.select("rating", "prediction")
364364
.map { case Row(rating: Float, prediction: Float) =>
365365
(rating.toDouble, prediction.toDouble)

python/pyspark/sql/dataframe.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,12 +434,18 @@ def unpersist(self, blocking=True):
434434
def repartition(self, numPartitions):
435435
""" Return a new :class:`DataFrame` that has exactly `numPartitions`
436436
partitions.
437+
438+
>>> df.repartition(10).rdd.getNumPartitions()
439+
10
437440
"""
438-
return DataFrame(self._jdf.repartition(numPartitions, None), self.sql_ctx)
441+
return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx)
439442

440443
def distinct(self):
441444
"""
442445
Return a new :class:`DataFrame` containing the distinct rows in this DataFrame.
446+
447+
>>> df.distinct().count()
448+
2L
443449
"""
444450
return DataFrame(self._jdf.distinct(), self.sql_ctx)
445451

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ import org.apache.spark.sql.types._
2323

2424
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
2525
def output = projectList.map(_.toAttribute)
26+
27+
override lazy val resolved: Boolean = {
28+
val containsAggregatesOrGenerators = projectList.exists ( _.collect {
29+
case agg: AggregateExpression => agg
30+
case generator: Generator => generator
31+
}.nonEmpty
32+
)
33+
34+
!expressions.exists(!_.resolved) && childrenResolved && !containsAggregatesOrGenerators
35+
}
2636
}
2737

2838
/**

0 commit comments

Comments
 (0)