Skip to content

Commit 9187066

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into closure-cleaner
2 parents d889950 + 53befac commit 9187066

File tree

147 files changed

+5926
-1017
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

147 files changed

+5926
-1017
lines changed

R/pkg/NAMESPACE

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ exportMethods(
7171
"unpersist",
7272
"value",
7373
"values",
74+
"zipPartitions",
7475
"zipRDD",
7576
"zipWithIndex",
7677
"zipWithUniqueId"

R/pkg/R/RDD.R

+51
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
6666
.Object
6767
})
6868

69+
setMethod("show", "RDD",
70+
function(.Object) {
71+
cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep=""))
72+
})
73+
6974
setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
7075
.Object@env <- new.env()
7176
.Object@env$isCached <- FALSE
@@ -1590,3 +1595,49 @@ setMethod("intersection",
15901595

15911596
keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
15921597
})
1598+
1599+
#' Zips an RDD's partitions with one (or more) RDD(s).
1600+
#' Same as zipPartitions in Spark.
1601+
#'
1602+
#' @param ... RDDs to be zipped.
1603+
#' @param func A function to transform zipped partitions.
1604+
#' @return A new RDD by applying a function to the zipped partitions.
1605+
#' Assumes that all the RDDs have the *same number of partitions*, but
1606+
#' does *not* require them to have the same number of elements in each partition.
1607+
#' @examples
1608+
#'\dontrun{
1609+
#' sc <- sparkR.init()
1610+
#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
1611+
#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
1612+
#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
1613+
#' collect(zipPartitions(rdd1, rdd2, rdd3,
1614+
#' func = function(x, y, z) { list(list(x, y, z))} ))
1615+
#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
1616+
#'}
1617+
#' @rdname zipRDD
1618+
#' @aliases zipPartitions,RDD
1619+
setMethod("zipPartitions",
1620+
"RDD",
1621+
function(..., func) {
1622+
rrdds <- list(...)
1623+
if (length(rrdds) == 1) {
1624+
return(rrdds[[1]])
1625+
}
1626+
nPart <- sapply(rrdds, numPartitions)
1627+
if (length(unique(nPart)) != 1) {
1628+
stop("Can only zipPartitions RDDs which have the same number of partitions.")
1629+
}
1630+
1631+
rrdds <- lapply(rrdds, function(rdd) {
1632+
mapPartitionsWithIndex(rdd, function(partIndex, part) {
1633+
print(length(part))
1634+
list(list(partIndex, part))
1635+
})
1636+
})
1637+
union.rdd <- Reduce(unionRDD, rrdds)
1638+
zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
1639+
res <- mapPartitions(zipped.rdd, function(plist) {
1640+
do.call(func, plist[[1]])
1641+
})
1642+
res
1643+
})

R/pkg/R/generics.R

+5
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
217217
#' @export
218218
setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
219219

220+
#' @rdname zipRDD
221+
#' @export
222+
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
223+
signature = "...")
224+
220225
#' @rdname zipWithIndex
221226
#' @seealso zipWithUniqueId
222227
#' @export

R/pkg/inst/tests/test_binary_function.R

+33
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,36 @@ test_that("cogroup on two RDDs", {
6666
expect_equal(sortKeyValueList(actual),
6767
sortKeyValueList(expected))
6868
})
69+
70+
test_that("zipPartitions() on RDDs", {
71+
rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
72+
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
73+
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
74+
actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
75+
func = function(x, y, z) { list(list(x, y, z))} ))
76+
expect_equal(actual,
77+
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
78+
79+
mockFile = c("Spark is pretty.", "Spark is awesome.")
80+
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
81+
writeLines(mockFile, fileName)
82+
83+
rdd <- textFile(sc, fileName, 1)
84+
actual <- collect(zipPartitions(rdd, rdd,
85+
func = function(x, y) { list(paste(x, y, sep = "\n")) }))
86+
expected <- list(paste(mockFile, mockFile, sep = "\n"))
87+
expect_equal(actual, expected)
88+
89+
rdd1 <- parallelize(sc, 0:1, 1)
90+
actual <- collect(zipPartitions(rdd1, rdd,
91+
func = function(x, y) { list(x + nchar(y)) }))
92+
expected <- list(0:1 + nchar(mockFile))
93+
expect_equal(actual, expected)
94+
95+
rdd <- map(rdd, function(x) { x })
96+
actual <- collect(zipPartitions(rdd, rdd1,
97+
func = function(x, y) { list(y + nchar(x)) }))
98+
expect_equal(actual, expected)
99+
100+
unlink(fileName)
101+
})

R/pkg/inst/tests/test_rdd.R

+5
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,11 @@ test_that("collectAsMap() on a pairwise RDD", {
759759
expect_equal(vals, list(`1` = "a", `2` = "b"))
760760
})
761761

762+
test_that("show()", {
763+
rdd <- parallelize(sc, list(1:10))
764+
expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
765+
})
766+
762767
test_that("sampleByKey() on pairwise RDDs", {
763768
rdd <- parallelize(sc, 1:2000)
764769
pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) })

assembly/pom.xml

-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@
194194
<plugin>
195195
<groupId>org.apache.maven.plugins</groupId>
196196
<artifactId>maven-assembly-plugin</artifactId>
197-
<version>2.4</version>
198197
<executions>
199198
<execution>
200199
<id>dist</id>

bin/spark-class2.cmd

+4-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
6161

6262
rem The launcher library prints the command to be executed in a single line suitable for being
6363
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
64-
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %*"') do (
64+
set LAUNCHER_OUTPUT=%temp%\spark-class-launcher-output-%RANDOM%.txt
65+
"%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %* > %LAUNCHER_OUTPUT%
66+
for /f "tokens=*" %%i in (%LAUNCHER_OUTPUT%) do (
6567
set SPARK_CMD=%%i
6668
)
69+
del %LAUNCHER_OUTPUT%
6770
%SPARK_CMD%

conf/spark-env.sh.template

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# This file is sourced when running various Spark programs.
44
# Copy it as spark-env.sh and edit that to configure Spark for your site.
55

6-
# Options read when launching programs locally with
6+
# Options read when launching programs locally with
77
# ./bin/run-example or ./bin/spark-submit
88
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
99
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
@@ -39,6 +39,7 @@
3939
# - SPARK_WORKER_DIR, to set the working directory of worker processes
4040
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
4141
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
42+
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
4243
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
4344
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
4445

core/pom.xml

-1
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,6 @@
478478
<plugin>
479479
<groupId>org.codehaus.mojo</groupId>
480480
<artifactId>exec-maven-plugin</artifactId>
481-
<version>1.3.2</version>
482481
<executions>
483482
<execution>
484483
<id>sparkr-pkg</id>

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

+17-9
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
7676

7777
private var timeoutCheckingTask: ScheduledFuture[_] = null
7878

79-
private val timeoutCheckingThread =
80-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
79+
// "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
80+
// block the thread for a long time.
81+
private val eventLoopThread =
82+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
8183

8284
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
8385

8486
override def onStart(): Unit = {
85-
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
87+
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
8688
override def run(): Unit = Utils.tryLogNonFatalError {
8789
Option(self).foreach(_.send(ExpireDeadHosts))
8890
}
@@ -99,11 +101,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
99101
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
100102
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
101103
if (scheduler != null) {
102-
val unknownExecutor = !scheduler.executorHeartbeatReceived(
103-
executorId, taskMetrics, blockManagerId)
104-
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
105104
executorLastSeen(executorId) = System.currentTimeMillis()
106-
context.reply(response)
105+
eventLoopThread.submit(new Runnable {
106+
override def run(): Unit = Utils.tryLogNonFatalError {
107+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
108+
executorId, taskMetrics, blockManagerId)
109+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
110+
context.reply(response)
111+
}
112+
})
107113
} else {
108114
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
109115
// case rarely happens. However, if it really happens, log it and ask the executor to
@@ -125,7 +131,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
125131
if (sc.supportDynamicAllocation) {
126132
// Asynchronously kill the executor to avoid blocking the current thread
127133
killExecutorThread.submit(new Runnable {
128-
override def run(): Unit = sc.killExecutor(executorId)
134+
override def run(): Unit = Utils.tryLogNonFatalError {
135+
sc.killExecutor(executorId)
136+
}
129137
})
130138
}
131139
executorLastSeen.remove(executorId)
@@ -137,7 +145,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
137145
if (timeoutCheckingTask != null) {
138146
timeoutCheckingTask.cancel(true)
139147
}
140-
timeoutCheckingThread.shutdownNow()
148+
eventLoopThread.shutdownNow()
141149
killExecutorThread.shutdownNow()
142150
}
143151
}

core/src/main/scala/org/apache/spark/SparkConf.scala

+89-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,74 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
211211
Utils.timeStringAsMs(get(key, defaultValue))
212212
}
213213

214+
/**
215+
* Get a size parameter as bytes; throws a NoSuchElementException if it's not set. If no
216+
* suffix is provided then bytes are assumed.
217+
* @throws NoSuchElementException
218+
*/
219+
def getSizeAsBytes(key: String): Long = {
220+
Utils.byteStringAsBytes(get(key))
221+
}
222+
223+
/**
224+
* Get a size parameter as bytes, falling back to a default if not set. If no
225+
* suffix is provided then bytes are assumed.
226+
*/
227+
def getSizeAsBytes(key: String, defaultValue: String): Long = {
228+
Utils.byteStringAsBytes(get(key, defaultValue))
229+
}
230+
231+
/**
232+
* Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
233+
* suffix is provided then Kibibytes are assumed.
234+
* @throws NoSuchElementException
235+
*/
236+
def getSizeAsKb(key: String): Long = {
237+
Utils.byteStringAsKb(get(key))
238+
}
239+
240+
/**
241+
* Get a size parameter as Kibibytes, falling back to a default if not set. If no
242+
* suffix is provided then Kibibytes are assumed.
243+
*/
244+
def getSizeAsKb(key: String, defaultValue: String): Long = {
245+
Utils.byteStringAsKb(get(key, defaultValue))
246+
}
247+
248+
/**
249+
* Get a size parameter as Mebibytes; throws a NoSuchElementException if it's not set. If no
250+
* suffix is provided then Mebibytes are assumed.
251+
* @throws NoSuchElementException
252+
*/
253+
def getSizeAsMb(key: String): Long = {
254+
Utils.byteStringAsMb(get(key))
255+
}
256+
257+
/**
258+
* Get a size parameter as Mebibytes, falling back to a default if not set. If no
259+
* suffix is provided then Mebibytes are assumed.
260+
*/
261+
def getSizeAsMb(key: String, defaultValue: String): Long = {
262+
Utils.byteStringAsMb(get(key, defaultValue))
263+
}
264+
265+
/**
266+
* Get a size parameter as Gibibytes; throws a NoSuchElementException if it's not set. If no
267+
* suffix is provided then Gibibytes are assumed.
268+
* @throws NoSuchElementException
269+
*/
270+
def getSizeAsGb(key: String): Long = {
271+
Utils.byteStringAsGb(get(key))
272+
}
214273

274+
/**
275+
* Get a size parameter as Gibibytes, falling back to a default if not set. If no
276+
* suffix is provided then Gibibytes are assumed.
277+
*/
278+
def getSizeAsGb(key: String, defaultValue: String): Long = {
279+
Utils.byteStringAsGb(get(key, defaultValue))
280+
}
281+
215282
/** Get a parameter as an Option */
216283
def getOption(key: String): Option[String] = {
217284
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
@@ -407,7 +474,13 @@ private[spark] object SparkConf extends Logging {
407474
"The spark.cache.class property is no longer being used! Specify storage levels using " +
408475
"the RDD.persist() method instead."),
409476
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
410-
"Please use spark.{driver,executor}.userClassPathFirst instead."))
477+
"Please use spark.{driver,executor}.userClassPathFirst instead."),
478+
DeprecatedConfig("spark.kryoserializer.buffer.mb", "1.4",
479+
"Please use spark.kryoserializer.buffer instead. The default value for " +
480+
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
481+
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
482+
)
483+
411484
Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
412485
}
413486

@@ -432,6 +505,21 @@ private[spark] object SparkConf extends Logging {
432505
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
433506
// Translate old value to a duration, with 10s wait time per try.
434507
translation = s => s"${s.toLong * 10}s")),
508+
"spark.reducer.maxSizeInFlight" -> Seq(
509+
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
510+
"spark.kryoserializer.buffer" ->
511+
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
512+
translation = s => s"${s.toDouble * 1000}k")),
513+
"spark.kryoserializer.buffer.max" -> Seq(
514+
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
515+
"spark.shuffle.file.buffer" -> Seq(
516+
AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
517+
"spark.executor.logs.rolling.maxSize" -> Seq(
518+
AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
519+
"spark.io.compression.snappy.blockSize" -> Seq(
520+
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
521+
"spark.io.compression.lz4.blockSize" -> Seq(
522+
AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
435523
"spark.rpc.numRetries" -> Seq(
436524
AlternateConfig("spark.akka.num.retries", "1.4")),
437525
"spark.rpc.retry.wait" -> Seq(

0 commit comments

Comments
 (0)