Skip to content

Commit c726bd9

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into master_nravi
Conflicts: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
2 parents f00fa31 + 74fb2ec commit c726bd9

File tree

63 files changed

+1711
-823
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+1711
-823
lines changed

assembly/pom.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@
141141
<include>com.google.common.**</include>
142142
</includes>
143143
<excludes>
144-
<exclude>com.google.common.base.Optional**</exclude>
144+
<exclude>com/google/common/base/Absent*</exclude>
145+
<exclude>com/google/common/base/Optional*</exclude>
146+
<exclude>com/google/common/base/Present*</exclude>
145147
</excludes>
146148
</relocation>
147149
</relocations>

core/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,9 @@
343343
<filter>
344344
<artifact>com.google.guava:guava</artifact>
345345
<includes>
346+
<include>com/google/common/base/Absent*</include>
346347
<include>com/google/common/base/Optional*</include>
348+
<include>com/google/common/base/Present*</include>
347349
</includes>
348350
</filter>
349351
</filters>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging {
10301030
}
10311031

10321032
/**
1033-
* Support function for API backtraces.
1033+
* Set the thread-local property for overriding the call sites
1034+
* of actions and RDDs.
10341035
*/
1035-
def setCallSite(site: String) {
1036-
setLocalProperty("externalCallSite", site)
1036+
def setCallSite(shortCallSite: String) {
1037+
setLocalProperty(CallSite.SHORT_FORM, shortCallSite)
10371038
}
10381039

10391040
/**
1040-
* Support function for API backtraces.
1041+
* Set the thread-local property for overriding the call sites
1042+
* of actions and RDDs.
1043+
*/
1044+
private[spark] def setCallSite(callSite: CallSite) {
1045+
setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm)
1046+
setLocalProperty(CallSite.LONG_FORM, callSite.longForm)
1047+
}
1048+
1049+
/**
1050+
* Clear the thread-local property for overriding the call sites
1051+
* of actions and RDDs.
10411052
*/
10421053
def clearCallSite() {
1043-
setLocalProperty("externalCallSite", null)
1054+
setLocalProperty(CallSite.SHORT_FORM, null)
1055+
setLocalProperty(CallSite.LONG_FORM, null)
10441056
}
10451057

10461058
/**
10471059
* Capture the current user callsite and return a formatted version for printing. If the user
1048-
* has overridden the call site, this will return the user's version.
1060+
* has overridden the call site using `setCallSite()`, this will return the user's version.
10491061
*/
10501062
private[spark] def getCallSite(): CallSite = {
1051-
Option(getLocalProperty("externalCallSite")) match {
1052-
case Some(callSite) => CallSite(callSite, longForm = "")
1053-
case None => Utils.getCallSite
1054-
}
1063+
Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
1064+
val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
1065+
CallSite(shortCallSite, longCallSite)
1066+
}.getOrElse(Utils.getCallSite())
10551067
}
10561068

10571069
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
7575
defaultProperties
7676
}
7777

78+
// Respect SPARK_*_MEMORY for cluster mode
79+
driverMemory = sys.env.get("SPARK_DRIVER_MEMORY").orNull
80+
executorMemory = sys.env.get("SPARK_EXECUTOR_MEMORY").orNull
81+
7882
parseOpts(args.toList)
7983
mergeSparkProperties()
8084
checkRequiredArguments()

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.concurrent._
2424

2525
import scala.collection.JavaConversions._
2626
import scala.collection.mutable.{ArrayBuffer, HashMap}
27+
import scala.util.control.NonFatal
2728

2829
import org.apache.spark._
2930
import org.apache.spark.deploy.SparkHadoopUtil
@@ -375,12 +376,17 @@ private[spark] class Executor(
375376
}
376377

377378
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
378-
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
379-
retryAttempts, retryIntervalMs, timeout)
380-
if (response.reregisterBlockManager) {
381-
logWarning("Told to re-register on heartbeat")
382-
env.blockManager.reregister()
379+
try {
380+
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
381+
retryAttempts, retryIntervalMs, timeout)
382+
if (response.reregisterBlockManager) {
383+
logWarning("Told to re-register on heartbeat")
384+
env.blockManager.reregister()
385+
}
386+
} catch {
387+
case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
383388
}
389+
384390
Thread.sleep(interval)
385391
}
386392
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.util.Random
20+
import java.util.{Properties, Random}
2121

2222
import scala.collection.{mutable, Map}
2323
import scala.collection.mutable.ArrayBuffer
@@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator
4141
import org.apache.spark.partial.GroupedCountEvaluator
4242
import org.apache.spark.partial.PartialResult
4343
import org.apache.spark.storage.StorageLevel
44-
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
44+
import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
4545
import org.apache.spark.util.collection.OpenHashMap
4646
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
4747

@@ -1224,7 +1224,8 @@ abstract class RDD[T: ClassTag](
12241224
private var storageLevel: StorageLevel = StorageLevel.NONE
12251225

12261226
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
1227-
@transient private[spark] val creationSite = Utils.getCallSite
1227+
@transient private[spark] val creationSite = sc.getCallSite()
1228+
12281229
private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
12291230

12301231
private[spark] def elementClassTag: ClassTag[T] = classTag[T]

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
4949
/** CallSite represents a place in user code. It can have a short and a long form. */
5050
private[spark] case class CallSite(shortForm: String, longForm: String)
5151

52+
private[spark] object CallSite {
53+
val SHORT_FORM = "callSite.short"
54+
val LONG_FORM = "callSite.long"
55+
}
56+
5257
/**
5358
* Various utility methods used by Spark.
5459
*/
@@ -859,18 +864,26 @@ private[spark] object Utils extends Logging {
859864
}
860865
}
861866

862-
/**
863-
* A regular expression to match classes of the "core" Spark API that we want to skip when
864-
* finding the call site of a method.
865-
*/
866-
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
867+
/** Default filtering function for finding call sites using `getCallSite`. */
868+
private def coreExclusionFunction(className: String): Boolean = {
869+
// A regular expression to match classes of the "core" Spark API that we want to skip when
870+
// finding the call site of a method.
871+
val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
872+
val SCALA_CLASS_REGEX = """^scala""".r
873+
val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
874+
val isScalaClass = SCALA_CLASS_REGEX.findFirstIn(className).isDefined
875+
// If the class is a Spark internal class or a Scala class, then exclude.
876+
isSparkCoreClass || isScalaClass
877+
}
867878

868879
/**
869880
* When called inside a class in the spark package, returns the name of the user code class
870881
* (outside the spark package) that called into Spark, as well as which Spark method they called.
871882
* This is used, for example, to tell users where in their code each RDD got created.
883+
*
884+
* @param skipClass Function that is used to exclude non-user-code classes.
872885
*/
873-
def getCallSite: CallSite = {
886+
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
874887
val trace = Thread.currentThread.getStackTrace()
875888
.filterNot { ste:StackTraceElement =>
876889
// When running under some profilers, the current stack trace might contain some bogus
@@ -891,7 +904,7 @@ private[spark] object Utils extends Logging {
891904

892905
for (el <- trace) {
893906
if (insideSpark) {
894-
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
907+
if (skipClass(el.getClassName)) {
895908
lastSparkMethod = if (el.getMethodName == "<init>") {
896909
// Spark method is a constructor; get its class name
897910
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,4 +1307,30 @@ public void collectUnderlyingScalaRDD() {
13071307
SomeCustomClass[] collected = (SomeCustomClass[]) rdd.rdd().retag(SomeCustomClass.class).collect();
13081308
Assert.assertEquals(data.size(), collected.length);
13091309
}
1310+
1311+
/**
1312+
* Test for SPARK-3647. This test needs to use the maven-built assembly to trigger the issue,
1313+
* since that's the only artifact where Guava classes have been relocated.
1314+
*/
1315+
@Test
1316+
public void testGuavaOptional() {
1317+
// Stop the context created in setUp() and start a local-cluster one, to force usage of the
1318+
// assembly.
1319+
sc.stop();
1320+
JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,512]", "JavaAPISuite");
1321+
try {
1322+
JavaRDD<Integer> rdd1 = localCluster.parallelize(Arrays.asList(1, 2, null), 3);
1323+
JavaRDD<Optional<Integer>> rdd2 = rdd1.map(
1324+
new Function<Integer, Optional<Integer>>() {
1325+
@Override
1326+
public Optional<Integer> call(Integer i) {
1327+
return Optional.fromNullable(i);
1328+
}
1329+
});
1330+
rdd2.collect();
1331+
} finally {
1332+
localCluster.stop();
1333+
}
1334+
}
1335+
13101336
}

dev/run-tests-jenkins

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,13 @@ function post_message () {
9292
merge_note=" * This patch merges cleanly."
9393

9494
source_files=$(
95-
git diff master --name-only \
95+
git diff master... --name-only `# diff patch against master from branch point` \
9696
| grep -v -e "\/test" `# ignore files in test directories` \
9797
| grep -e "\.py$" -e "\.java$" -e "\.scala$" `# include only code files` \
9898
| tr "\n" " "
9999
)
100100
new_public_classes=$(
101-
git diff master ${source_files} `# diff this patch against master and...` \
101+
git diff master... ${source_files} `# diff patch against master from branch point` \
102102
| grep "^\+" `# filter in only added lines` \
103103
| sed -r -e "s/^\+//g" `# remove the leading +` \
104104
| grep -e "trait " -e "class " `# filter in lines with these key words` \

docs/mllib-feature-extraction.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ val sc: SparkContext = ...
6868
val documents: RDD[Seq[String]] = sc.textFile("...").map(_.split(" ").toSeq)
6969

7070
val hashingTF = new HashingTF()
71-
val tf: RDD[Vector] = hasingTF.transform(documents)
71+
val tf: RDD[Vector] = hashingTF.transform(documents)
7272
{% endhighlight %}
7373

7474
While applying `HashingTF` only needs a single pass to the data, applying `IDF` needs two passes:

0 commit comments

Comments
 (0)