Skip to content

Commit e43ac25

Browse files
committed
Merge branch 'master' into SPARK-8103
2 parents 6bc23af + 031d7d4 commit e43ac25

File tree

226 files changed

+5194
-1293
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

226 files changed

+5194
-1293
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ exportMethods("abs",
7777
"atan",
7878
"atan2",
7979
"avg",
80+
"between",
8081
"cast",
8182
"cbrt",
8283
"ceiling",

R/pkg/R/DataFrame.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1328,7 +1328,7 @@ setMethod("write.df",
13281328
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
13291329
options <- varargsToEnv(...)
13301330
if (!is.null(path)) {
1331-
options[['path']] = path
1331+
options[['path']] <- path
13321332
}
13331333
callJMethod(df@sdf, "save", source, jmode, options)
13341334
})

R/pkg/R/client.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ connectBackend <- function(hostname, port, timeout = 6000) {
3636

3737
determineSparkSubmitBin <- function() {
3838
if (.Platform$OS.type == "unix") {
39-
sparkSubmitBinName = "spark-submit"
39+
sparkSubmitBinName <- "spark-submit"
4040
} else {
41-
sparkSubmitBinName = "spark-submit.cmd"
41+
sparkSubmitBinName <- "spark-submit.cmd"
4242
}
4343
sparkSubmitBinName
4444
}

R/pkg/R/column.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,23 @@ setMethod("substr", signature(x = "Column"),
187187
column(jc)
188188
})
189189

190+
#' between
191+
#'
192+
#' Test if the column is between the lower bound and upper bound, inclusive.
193+
#'
194+
#' @rdname column
195+
#'
196+
#' @param bounds lower and upper bounds
197+
setMethod("between", signature(x = "Column"),
198+
function(x, bounds) {
199+
if (is.vector(bounds) && length(bounds) == 2) {
200+
jc <- callJMethod(x@jc, "between", bounds[1], bounds[2])
201+
column(jc)
202+
} else {
203+
stop("bounds should be a vector of lower and upper bounds")
204+
}
205+
})
206+
190207
#' Casts the column to a different data type.
191208
#'
192209
#' @rdname column

R/pkg/R/deserialize.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
# Int -> integer
2424
# String -> character
2525
# Boolean -> logical
26+
# Float -> double
2627
# Double -> double
2728
# Long -> double
2829
# Array[Byte] -> raw

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,10 @@ setGeneric("asc", function(x) { standardGeneric("asc") })
567567
#' @export
568568
setGeneric("avg", function(x, ...) { standardGeneric("avg") })
569569

570+
#' @rdname column
571+
#' @export
572+
setGeneric("between", function(x, bounds) { standardGeneric("between") })
573+
570574
#' @rdname column
571575
#' @export
572576
setGeneric("cast", function(x, dataType) { standardGeneric("cast") })

R/pkg/R/group.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ setMethod("count",
8787
setMethod("agg",
8888
signature(x = "GroupedData"),
8989
function(x, ...) {
90-
cols = list(...)
90+
cols <- list(...)
9191
stopifnot(length(cols) > 0)
9292
if (is.character(cols[[1]])) {
9393
cols <- varargsToEnv(...)
@@ -97,7 +97,7 @@ setMethod("agg",
9797
if (!is.null(ns)) {
9898
for (n in ns) {
9999
if (n != "") {
100-
cols[[n]] = alias(cols[[n]], n)
100+
cols[[n]] <- alias(cols[[n]], n)
101101
}
102102
}
103103
}

R/pkg/R/schema.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ structField.character <- function(x, type, nullable = TRUE) {
123123
}
124124
options <- c("byte",
125125
"integer",
126+
"float",
126127
"double",
127128
"numeric",
128129
"character",

R/pkg/R/utils.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
4141
if (isInstanceOf(obj, "scala.Tuple2")) {
4242
# JavaPairRDD[Array[Byte], Array[Byte]].
4343

44-
keyBytes = callJMethod(obj, "_1")
45-
valBytes = callJMethod(obj, "_2")
44+
keyBytes <- callJMethod(obj, "_1")
45+
valBytes <- callJMethod(obj, "_2")
4646
res <- list(unserialize(keyBytes),
4747
unserialize(valBytes))
4848
} else {

R/pkg/inst/tests/test_binaryFile.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ context("functions on binary files")
2020
# JavaSparkContext handle
2121
sc <- sparkR.init()
2222

23-
mockFile = c("Spark is pretty.", "Spark is awesome.")
23+
mockFile <- c("Spark is pretty.", "Spark is awesome.")
2424

2525
test_that("saveAsObjectFile()/objectFile() following textFile() works", {
2626
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")

R/pkg/inst/tests/test_binary_function.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ test_that("zipPartitions() on RDDs", {
7676
expect_equal(actual,
7777
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
7878

79-
mockFile = c("Spark is pretty.", "Spark is awesome.")
79+
mockFile <- c("Spark is pretty.", "Spark is awesome.")
8080
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
8181
writeLines(mockFile, fileName)
8282

R/pkg/inst/tests/test_rdd.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ test_that("zipRDD() on RDDs", {
447447
expect_equal(actual,
448448
list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
449449

450-
mockFile = c("Spark is pretty.", "Spark is awesome.")
450+
mockFile <- c("Spark is pretty.", "Spark is awesome.")
451451
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
452452
writeLines(mockFile, fileName)
453453

@@ -483,7 +483,7 @@ test_that("cartesian() on RDDs", {
483483
actual <- collect(cartesian(rdd, emptyRdd))
484484
expect_equal(actual, list())
485485

486-
mockFile = c("Spark is pretty.", "Spark is awesome.")
486+
mockFile <- c("Spark is pretty.", "Spark is awesome.")
487487
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
488488
writeLines(mockFile, fileName)
489489

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,32 @@ test_that("create DataFrame from RDD", {
108108
expect_equal(count(df), 10)
109109
expect_equal(columns(df), c("a", "b"))
110110
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
111+
112+
df <- jsonFile(sqlContext, jsonPathNa)
113+
hiveCtx <- tryCatch({
114+
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
115+
}, error = function(err) {
116+
skip("Hive is not build with SparkSQL, skipped")
117+
})
118+
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
119+
insertInto(df, "people")
120+
expect_equal(sql(hiveCtx, "SELECT age from people WHERE name = 'Bob'"), c(16))
121+
expect_equal(sql(hiveCtx, "SELECT height from people WHERE name ='Bob'"), c(176.5))
122+
123+
schema <- structType(structField("name", "string"), structField("age", "integer"),
124+
structField("height", "float"))
125+
df2 <- createDataFrame(sqlContext, df.toRDD, schema)
126+
expect_equal(columns(df2), c("name", "age", "height"))
127+
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
128+
expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5))
129+
130+
localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18), height=c(164.10, 181.4, 173.7))
131+
df <- createDataFrame(sqlContext, localDF, schema)
132+
expect_is(df, "DataFrame")
133+
expect_equal(count(df), 3)
134+
expect_equal(columns(df), c("name", "age", "height"))
135+
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
136+
expect_equal(collect(where(df, df$name == "John")), c("John", 19, 164.10))
111137
})
112138

113139
test_that("convert NAs to null type in DataFrames", {
@@ -612,6 +638,18 @@ test_that("column functions", {
612638
c7 <- floor(c) + log(c) + log10(c) + log1p(c) + rint(c)
613639
c8 <- sign(c) + sin(c) + sinh(c) + tan(c) + tanh(c)
614640
c9 <- toDegrees(c) + toRadians(c)
641+
642+
df <- jsonFile(sqlContext, jsonPath)
643+
df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
644+
expect_equal(collect(df2)[[2, 1]], TRUE)
645+
expect_equal(collect(df2)[[2, 2]], FALSE)
646+
expect_equal(collect(df2)[[3, 1]], FALSE)
647+
expect_equal(collect(df2)[[3, 2]], TRUE)
648+
649+
df3 <- select(df, between(df$name, c("Apache", "Spark")))
650+
expect_equal(collect(df3)[[1, 1]], TRUE)
651+
expect_equal(collect(df3)[[2, 1]], FALSE)
652+
expect_equal(collect(df3)[[3, 1]], TRUE)
615653
})
616654

617655
test_that("column binary mathfunctions", {

R/pkg/inst/tests/test_textFile.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ context("the textFile() function")
2020
# JavaSparkContext handle
2121
sc <- sparkR.init()
2222

23-
mockFile = c("Spark is pretty.", "Spark is awesome.")
23+
mockFile <- c("Spark is pretty.", "Spark is awesome.")
2424

2525
test_that("textFile() on a local file returns an RDD", {
2626
fileName <- tempfile(pattern="spark-test", fileext=".tmp")

R/pkg/inst/tests/test_utils.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ test_that("cleanClosure on R functions", {
119119
# Test for overriding variables in base namespace (Issue: SparkR-196).
120120
nums <- as.list(1:10)
121121
rdd <- parallelize(sc, nums, 2L)
122-
t = 4 # Override base::t in .GlobalEnv.
122+
t <- 4 # Override base::t in .GlobalEnv.
123123
f <- function(x) { x > t }
124124
newF <- cleanClosure(f)
125125
env <- environment(newF)

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,11 @@
372372
<artifactId>junit-interface</artifactId>
373373
<scope>test</scope>
374374
</dependency>
375+
<dependency>
376+
<groupId>org.apache.curator</groupId>
377+
<artifactId>curator-test</artifactId>
378+
<scope>test</scope>
379+
</dependency>
375380
<dependency>
376381
<groupId>net.razorvine</groupId>
377382
<artifactId>pyrolite</artifactId>

core/src/main/java/org/apache/spark/JavaSparkListener.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,7 @@
1717

1818
package org.apache.spark;
1919

20-
import org.apache.spark.scheduler.SparkListener;
21-
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
22-
import org.apache.spark.scheduler.SparkListenerApplicationStart;
23-
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
24-
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
25-
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
26-
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
27-
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
28-
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
29-
import org.apache.spark.scheduler.SparkListenerJobEnd;
30-
import org.apache.spark.scheduler.SparkListenerJobStart;
31-
import org.apache.spark.scheduler.SparkListenerStageCompleted;
32-
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
33-
import org.apache.spark.scheduler.SparkListenerTaskEnd;
34-
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
35-
import org.apache.spark.scheduler.SparkListenerTaskStart;
36-
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
20+
import org.apache.spark.scheduler.*;
3721

3822
/**
3923
* Java clients should extend this class instead of implementing
@@ -94,4 +78,8 @@ public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }
9478

9579
@Override
9680
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
81+
82+
@Override
83+
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
84+
9785
}

core/src/main/java/org/apache/spark/SparkFirehoseListener.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,10 @@ public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
112112
public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
113113
onEvent(executorRemoved);
114114
}
115+
116+
@Override
117+
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
118+
onEvent(blockUpdated);
119+
}
120+
115121
}

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
7575
private final Serializer serializer;
7676

7777
/** Array of file writers, one for each partition */
78-
private BlockObjectWriter[] partitionWriters;
78+
private DiskBlockObjectWriter[] partitionWriters;
7979

8080
public BypassMergeSortShuffleWriter(
8181
SparkConf conf,
@@ -101,7 +101,7 @@ public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
101101
}
102102
final SerializerInstance serInstance = serializer.newInstance();
103103
final long openStartTime = System.nanoTime();
104-
partitionWriters = new BlockObjectWriter[numPartitions];
104+
partitionWriters = new DiskBlockObjectWriter[numPartitions];
105105
for (int i = 0; i < numPartitions; i++) {
106106
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
107107
blockManager.diskBlockManager().createTempShuffleBlock();
@@ -121,7 +121,7 @@ public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
121121
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
122122
}
123123

124-
for (BlockObjectWriter writer : partitionWriters) {
124+
for (DiskBlockObjectWriter writer : partitionWriters) {
125125
writer.commitAndClose();
126126
}
127127
}
@@ -169,7 +169,7 @@ public void stop() throws IOException {
169169
if (partitionWriters != null) {
170170
try {
171171
final DiskBlockManager diskBlockManager = blockManager.diskBlockManager();
172-
for (BlockObjectWriter writer : partitionWriters) {
172+
for (DiskBlockObjectWriter writer : partitionWriters) {
173173
// This method explicitly does _not_ throw exceptions:
174174
writer.revertPartialWritesAndClose();
175175
if (!diskBlockManager.getFile(writer.blockId()).delete()) {

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
157157

158158
// Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
159159
// after SPARK-5581 is fixed.
160-
BlockObjectWriter writer;
160+
DiskBlockObjectWriter writer;
161161

162162
// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
163163
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.spark.serializer.DummySerializerInstance;
2727
import org.apache.spark.storage.BlockId;
2828
import org.apache.spark.storage.BlockManager;
29-
import org.apache.spark.storage.BlockObjectWriter;
29+
import org.apache.spark.storage.DiskBlockObjectWriter;
3030
import org.apache.spark.storage.TempLocalBlockId;
3131
import org.apache.spark.unsafe.PlatformDependent;
3232

@@ -47,7 +47,7 @@ final class UnsafeSorterSpillWriter {
4747
private final File file;
4848
private final BlockId blockId;
4949
private final int numRecordsToWrite;
50-
private BlockObjectWriter writer;
50+
private DiskBlockObjectWriter writer;
5151
private int numRecordsSpilled = 0;
5252

5353
public UnsafeSorterSpillWriter(

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import java.util.concurrent.TimeUnit
2121

2222
import scala.collection.mutable
23+
import scala.util.control.ControlThrowable
2324

2425
import com.codahale.metrics.{Gauge, MetricRegistry}
2526

@@ -211,7 +212,16 @@ private[spark] class ExecutorAllocationManager(
211212
listenerBus.addListener(listener)
212213

213214
val scheduleTask = new Runnable() {
214-
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
215+
override def run(): Unit = {
216+
try {
217+
schedule()
218+
} catch {
219+
case ct: ControlThrowable =>
220+
throw ct
221+
case t: Throwable =>
222+
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
223+
}
224+
}
215225
}
216226
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
217227
}

core/src/main/scala/org/apache/spark/Logging.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private object Logging {
159159
try {
160160
// We use reflection here to handle the case where users remove the
161161
// slf4j-to-jul bridge order to route their logs to JUL.
162-
val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
162+
val bridgeClass = Utils.classForName("org.slf4j.bridge.SLF4JBridgeHandler")
163163
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
164164
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
165165
if (!installed) {

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ object Partitioner {
7676
* produce an unexpected or incorrect result.
7777
*/
7878
class HashPartitioner(partitions: Int) extends Partitioner {
79+
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
80+
7981
def numPartitions: Int = partitions
8082

8183
def getPartition(key: Any): Int = key match {

0 commit comments

Comments
 (0)