Skip to content

Commit 824f761

Browse files
committed
Merge remote-tracking branch 'upstream/master' into expr_bin
2 parents 50e0c3b + 490d5a7 commit 824f761

File tree

40 files changed

+679
-208
lines changed

40 files changed

+679
-208
lines changed

R/pkg/R/serialize.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ writeObject <- function(con, object, writeType = TRUE) {
3737
# passing in vectors as arrays and instead require arrays to be passed
3838
# as lists.
3939
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
40+
# Checking types is needed here, since ‘is.na’ only handles atomic vectors,
41+
# lists and pairlists
42+
if (type %in% c("integer", "character", "logical", "double", "numeric")) {
43+
if (is.na(object)) {
44+
object <- NULL
45+
type <- "NULL"
46+
}
47+
}
4048
if (writeType) {
4149
writeType(con, type)
4250
}

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,43 @@ test_that("create DataFrame from RDD", {
101101
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
102102
})
103103

104+
test_that("convert NAs to null type in DataFrames", {
105+
rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
106+
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
107+
expect_true(is.na(collect(df)[2, "a"]))
108+
expect_equal(collect(df)[2, "b"], 4L)
109+
110+
l <- data.frame(x = 1L, y = c(1L, NA_integer_, 3L))
111+
df <- createDataFrame(sqlContext, l)
112+
expect_equal(collect(df)[2, "x"], 1L)
113+
expect_true(is.na(collect(df)[2, "y"]))
114+
115+
rdd <- parallelize(sc, list(list(1, 2), list(NA, 4)))
116+
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
117+
expect_true(is.na(collect(df)[2, "a"]))
118+
expect_equal(collect(df)[2, "b"], 4)
119+
120+
l <- data.frame(x = 1, y = c(1, NA_real_, 3))
121+
df <- createDataFrame(sqlContext, l)
122+
expect_equal(collect(df)[2, "x"], 1)
123+
expect_true(is.na(collect(df)[2, "y"]))
124+
125+
l <- list("a", "b", NA, "d")
126+
df <- createDataFrame(sqlContext, l)
127+
expect_true(is.na(collect(df)[3, "_1"]))
128+
expect_equal(collect(df)[4, "_1"], "d")
129+
130+
l <- list("a", "b", NA_character_, "d")
131+
df <- createDataFrame(sqlContext, l)
132+
expect_true(is.na(collect(df)[3, "_1"]))
133+
expect_equal(collect(df)[4, "_1"], "d")
134+
135+
l <- list(TRUE, FALSE, NA, TRUE)
136+
df <- createDataFrame(sqlContext, l)
137+
expect_true(is.na(collect(df)[3, "_1"]))
138+
expect_equal(collect(df)[4, "_1"], TRUE)
139+
})
140+
104141
test_that("toDF", {
105142
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
106143
df <- toDF(rdd, list("a", "b"))

core/src/main/scala/org/apache/spark/serializer/Serializer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ abstract class DeserializationStream {
182182
} catch {
183183
case eof: EOFException =>
184184
finished = true
185+
null
185186
}
186187
}
187188

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,11 +539,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
539539
/**
540540
* For testing only. Wait until at least `numExecutors` executors are up, or throw
541541
* `TimeoutException` if the waiting time elapsed before `numExecutors` executors up.
542+
* Exposed for testing.
542543
*
543544
* @param numExecutors the number of executors to wait at least
544545
* @param timeout time to wait in milliseconds
545546
*/
546-
@VisibleForTesting
547547
private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = {
548548
val finishTime = System.currentTimeMillis() + timeout
549549
while (System.currentTimeMillis() < finishTime) {

core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@ import org.apache.ivy.plugins.resolver.IBiblioResolver
2828

2929
import org.apache.spark.SparkFunSuite
3030
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
31+
import org.apache.spark.util.Utils
3132

3233
class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
3334

35+
private var tempIvyPath: String = _
36+
3437
private val noOpOutputStream = new OutputStream {
3538
def write(b: Int) = {}
3639
}
@@ -47,6 +50,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
4750
super.beforeAll()
4851
// We don't want to write logs during testing
4952
SparkSubmitUtils.printStream = new BufferPrintStream
53+
tempIvyPath = Utils.createTempDir(namePrefix = "ivy").getAbsolutePath()
5054
}
5155

5256
test("incorrect maven coordinate throws error") {
@@ -90,21 +94,20 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
9094
}
9195

9296
test("ivy path works correctly") {
93-
val ivyPath = "dummy" + File.separator + "ivy"
9497
val md = SparkSubmitUtils.getModuleDescriptor
9598
val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar")
96-
var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath))
99+
var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath))
97100
for (i <- 0 until 3) {
98-
val index = jPaths.indexOf(ivyPath)
101+
val index = jPaths.indexOf(tempIvyPath)
99102
assert(index >= 0)
100-
jPaths = jPaths.substring(index + ivyPath.length)
103+
jPaths = jPaths.substring(index + tempIvyPath.length)
101104
}
102105
val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
103106
IvyTestUtils.withRepository(main, None, None) { repo =>
104107
// end to end
105108
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo),
106-
Option(ivyPath), true)
107-
assert(jarPath.indexOf(ivyPath) >= 0, "should use non-default ivy path")
109+
Option(tempIvyPath), true)
110+
assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path")
108111
}
109112
}
110113

@@ -123,13 +126,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
123126
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
124127
}
125128
// Local ivy repository with modified home
126-
val dummyIvyPath = "dummy" + File.separator + "ivy"
127-
val dummyIvyLocal = new File(dummyIvyPath, "local" + File.separator)
129+
val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator)
128130
IvyTestUtils.withRepository(main, None, Some(dummyIvyLocal), true) { repo =>
129131
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None,
130-
Some(dummyIvyPath), true)
132+
Some(tempIvyPath), true)
131133
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
132-
assert(jarPath.indexOf(dummyIvyPath) >= 0, "should be in new ivy path")
134+
assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
133135
}
134136
}
135137

docs/mllib-frequent-pattern-mining.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ MLlib's FP-growth implementation takes the following (hyper-)parameters:
3939
<div class="codetabs">
4040
<div data-lang="scala" markdown="1">
4141

42-
[`FPGrowth`](api/java/org/apache/spark/mllib/fpm/FPGrowth.html) implements the
42+
[`FPGrowth`](api/scala/index.html#org.apache.spark.mllib.fpm.FPGrowth) implements the
4343
FP-growth algorithm.
4444
It take a `JavaRDD` of transactions, where each transaction is an `Iterable` of items of a generic type.
4545
Calling `FPGrowth.run` with transactions returns an
46-
[`FPGrowthModel`](api/java/org/apache/spark/mllib/fpm/FPGrowthModel.html)
46+
[`FPGrowthModel`](api/scala/index.html#org.apache.spark.mllib.fpm.FPGrowthModel)
4747
that stores the frequent itemsets with their frequencies.
4848

4949
{% highlight scala %}

docs/sql-programming-guide.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,11 @@ root
11021102
{% endhighlight %}
11031103

11041104
Notice that the data types of the partitioning columns are automatically inferred. Currently,
1105-
numeric data types and string type are supported.
1105+
numeric data types and string type are supported. Sometimes users may not want to automatically
1106+
infer the data types of the partitioning columns. For these use cases, the automatic type inference
1107+
can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to
1108+
`true`. When type inference is disabled, string type will be used for the partitioning columns.
1109+
11061110

11071111
### Schema merging
11081112

docs/streaming-kafka-integration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ title: Spark Streaming + Kafka Integration Guide
77
## Approach 1: Receiver-based Approach
88
This approach uses a Receiver to receive the data. The Received is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.
99

10-
However, under default configuration, this approach can lose data under failures (see [receiver reliability](streaming-programming-guide.html#receiver-reliability). To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming. To ensure zero data loss, enable the Write Ahead Logs (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See [Deploying section](streaming-programming-guide.html#deploying-applications) in the streaming programming guide for more details on Write Ahead Logs.
10+
However, under default configuration, this approach can lose data under failures (see [receiver reliability](streaming-programming-guide.html#receiver-reliability). To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See [Deploying section](streaming-programming-guide.html#deploying-applications) in the streaming programming guide for more details on Write Ahead Logs.
1111

1212
Next, we discuss how to use this approach in your streaming application.
1313

mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.ml
1919

20+
import java.{util => ju}
21+
22+
import scala.collection.JavaConverters._
2023
import scala.collection.mutable.ListBuffer
2124

2225
import org.apache.spark.Logging
@@ -175,6 +178,11 @@ class PipelineModel private[ml] (
175178
val stages: Array[Transformer])
176179
extends Model[PipelineModel] with Logging {
177180

181+
/** A Java/Python-friendly auxiliary constructor. */
182+
private[ml] def this(uid: String, stages: ju.List[Transformer]) = {
183+
this(uid, stages.asScala.toArray)
184+
}
185+
178186
override def validateParams(): Unit = {
179187
super.validateParams()
180188
stages.foreach(_.validateParams())

mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ object GradientDescent extends Logging {
179179
* if it's L2 updater; for L1 updater, the same logic is followed.
180180
*/
181181
var regVal = updater.compute(
182-
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
182+
weights, Vectors.zeros(weights.size), 0, 1, regParam)._2
183183

184184
for (i <- 1 to numIterations) {
185185
val bcWeights = data.context.broadcast(weights)

mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,11 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
195195
*/
196196
val initialWeights = {
197197
if (numOfLinearPredictor == 1) {
198-
Vectors.dense(new Array[Double](numFeatures))
198+
Vectors.zeros(numFeatures)
199199
} else if (addIntercept) {
200-
Vectors.dense(new Array[Double]((numFeatures + 1) * numOfLinearPredictor))
200+
Vectors.zeros((numFeatures + 1) * numOfLinearPredictor)
201201
} else {
202-
Vectors.dense(new Array[Double](numFeatures * numOfLinearPredictor))
202+
Vectors.zeros(numFeatures * numOfLinearPredictor)
203203
}
204204
}
205205
run(input, initialWeights)

mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,7 @@ abstract class StreamingLinearAlgorithm[
8383
throw new IllegalArgumentException("Model must be initialized before starting training.")
8484
}
8585
data.foreachRDD { (rdd, time) =>
86-
val initialWeights =
87-
model match {
88-
case Some(m) =>
89-
m.weights
90-
case None =>
91-
val numFeatures = rdd.first().features.size
92-
Vectors.dense(numFeatures)
93-
}
94-
model = Some(algorithm.run(rdd, initialWeights))
86+
model = Some(algorithm.run(rdd, model.get.weights))
9587
logInfo("Model updated at time %s".format(time.toString))
9688
val display = model.get.weights.size match {
9789
case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...")

mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class StreamingLinearRegressionWithSGD private[mllib] (
7979
this
8080
}
8181

82-
/** Set the initial weights. Default: [0.0, 0.0]. */
82+
/** Set the initial weights. */
8383
def setInitialWeights(initialWeights: Vector): this.type = {
8484
this.model = Some(algorithm.createModel(initialWeights, 0.0))
8585
this

mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.ml
1919

20+
import scala.collection.JavaConverters._
21+
2022
import org.mockito.Matchers.{any, eq => meq}
2123
import org.mockito.Mockito.when
2224
import org.scalatest.mock.MockitoSugar.mock
@@ -81,4 +83,19 @@ class PipelineSuite extends SparkFunSuite {
8183
pipeline.fit(dataset)
8284
}
8385
}
86+
87+
test("pipeline model constructors") {
88+
val transform0 = mock[Transformer]
89+
val model1 = mock[MyModel]
90+
91+
val stages = Array(transform0, model1)
92+
val pipelineModel0 = new PipelineModel("pipeline0", stages)
93+
assert(pipelineModel0.uid === "pipeline0")
94+
assert(pipelineModel0.stages === stages)
95+
96+
val stagesAsList = stages.toList.asJava
97+
val pipelineModel1 = new PipelineModel("pipeline1", stagesAsList)
98+
assert(pipelineModel1.uid === "pipeline1")
99+
assert(pipelineModel1.stages === stages)
100+
}
84101
}

pom.xml

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@
179179
<parquet.deps.scope>compile</parquet.deps.scope>
180180

181181
<!--
182-
Overridable test home. So that you can call individual pom files directory without
182+
Overridable test home. So that you can call individual pom files directly without
183183
things breaking.
184184
-->
185185
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
@@ -587,7 +587,7 @@
587587
<dependency>
588588
<groupId>io.netty</groupId>
589589
<artifactId>netty-all</artifactId>
590-
<version>4.0.23.Final</version>
590+
<version>4.0.28.Final</version>
591591
</dependency>
592592
<dependency>
593593
<groupId>org.apache.derby</groupId>
@@ -1256,6 +1256,7 @@
12561256
<systemProperties>
12571257
<derby.system.durability>test</derby.system.durability>
12581258
<java.awt.headless>true</java.awt.headless>
1259+
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
12591260
<spark.test.home>${spark.test.home}</spark.test.home>
12601261
<spark.testing>1</spark.testing>
12611262
<spark.ui.enabled>false</spark.ui.enabled>
@@ -1289,6 +1290,7 @@
12891290
<systemProperties>
12901291
<derby.system.durability>test</derby.system.durability>
12911292
<java.awt.headless>true</java.awt.headless>
1293+
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
12921294
<spark.test.home>${spark.test.home}</spark.test.home>
12931295
<spark.testing>1</spark.testing>
12941296
<spark.ui.enabled>false</spark.ui.enabled>
@@ -1548,6 +1550,26 @@
15481550
</execution>
15491551
</executions>
15501552
</plugin>
1553+
1554+
<plugin>
1555+
<groupId>org.apache.maven.plugins</groupId>
1556+
<artifactId>maven-antrun-plugin</artifactId>
1557+
<executions>
1558+
<execution>
1559+
<id>create-tmp-dir</id>
1560+
<phase>generate-test-resources</phase>
1561+
<goals>
1562+
<goal>run</goal>
1563+
</goals>
1564+
<configuration>
1565+
<target>
1566+
<mkdir dir="${project.build.directory}/tmp" />
1567+
</target>
1568+
</configuration>
1569+
</execution>
1570+
</executions>
1571+
</plugin>
1572+
15511573
<!-- Enable surefire and scalatest in all children, in one place: -->
15521574
<plugin>
15531575
<groupId>org.apache.maven.plugins</groupId>

project/SparkBuild.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ object BuildCommons {
5151
// Root project.
5252
val spark = ProjectRef(buildLocation, "spark")
5353
val sparkHome = buildLocation
54+
55+
val testTempDir = s"$sparkHome/target/tmp"
56+
if (!new File(testTempDir).isDirectory()) {
57+
require(new File(testTempDir).mkdirs())
58+
}
5459
}
5560

5661
object SparkBuild extends PomBuild {
@@ -496,6 +501,7 @@ object TestSettings {
496501
"SPARK_DIST_CLASSPATH" ->
497502
(fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
498503
"JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))),
504+
javaOptions in Test += s"-Djava.io.tmpdir=$testTempDir",
499505
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
500506
javaOptions in Test += "-Dspark.testing=1",
501507
javaOptions in Test += "-Dspark.port.maxRetries=100",

0 commit comments

Comments
 (0)