Skip to content

Commit df098fc

Browse files
committed
Merge branch 'master' into giwa
2 parents 550dfd9 + 8ca4ecb commit df098fc

File tree

96 files changed

+2231
-982
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+2231
-982
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*.iml
77
*.iws
88
.idea/
9+
.idea_modules/
910
sbt/*.jar
1011
.settings
1112
.cache

assembly/pom.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@
141141
<include>com.google.common.**</include>
142142
</includes>
143143
<excludes>
144-
<exclude>com.google.common.base.Optional**</exclude>
144+
<exclude>com/google/common/base/Absent*</exclude>
145+
<exclude>com/google/common/base/Optional*</exclude>
146+
<exclude>com/google/common/base/Present*</exclude>
145147
</excludes>
146148
</relocation>
147149
</relocations>

core/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,9 @@
343343
<filter>
344344
<artifact>com.google.guava:guava</artifact>
345345
<includes>
346+
<include>com/google/common/base/Absent*</include>
346347
<include>com/google/common/base/Optional*</include>
348+
<include>com/google/common/base/Present*</include>
347349
</includes>
348350
</filter>
349351
</filters>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging {
10301030
}
10311031

10321032
/**
1033-
* Support function for API backtraces.
1033+
* Set the thread-local property for overriding the call sites
1034+
* of actions and RDDs.
10341035
*/
1035-
def setCallSite(site: String) {
1036-
setLocalProperty("externalCallSite", site)
1036+
def setCallSite(shortCallSite: String) {
1037+
setLocalProperty(CallSite.SHORT_FORM, shortCallSite)
10371038
}
10381039

10391040
/**
1040-
* Support function for API backtraces.
1041+
* Set the thread-local property for overriding the call sites
1042+
* of actions and RDDs.
1043+
*/
1044+
private[spark] def setCallSite(callSite: CallSite) {
1045+
setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm)
1046+
setLocalProperty(CallSite.LONG_FORM, callSite.longForm)
1047+
}
1048+
1049+
/**
1050+
* Clear the thread-local property for overriding the call sites
1051+
* of actions and RDDs.
10411052
*/
10421053
def clearCallSite() {
1043-
setLocalProperty("externalCallSite", null)
1054+
setLocalProperty(CallSite.SHORT_FORM, null)
1055+
setLocalProperty(CallSite.LONG_FORM, null)
10441056
}
10451057

10461058
/**
10471059
* Capture the current user callsite and return a formatted version for printing. If the user
1048-
* has overridden the call site, this will return the user's version.
1060+
* has overridden the call site using `setCallSite()`, this will return the user's version.
10491061
*/
10501062
private[spark] def getCallSite(): CallSite = {
1051-
Option(getLocalProperty("externalCallSite")) match {
1052-
case Some(callSite) => CallSite(callSite, longForm = "")
1053-
case None => Utils.getCallSite
1054-
}
1063+
Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
1064+
val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
1065+
CallSite(shortCallSite, longCallSite)
1066+
}.getOrElse(Utils.getCallSite())
10551067
}
10561068

10571069
/**

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
116116
}
117117
}
118118
} else {
119-
logWarning ("No need to commit output of task: " + taID.value)
119+
logInfo ("No need to commit output of task: " + taID.value)
120120
}
121121
}
122122

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
469469
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
470470
}
471471

472+
/**
473+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
474+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
475+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
476+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
477+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
478+
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
479+
*/
480+
def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
481+
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
482+
val joinResult = rdd.fullOuterJoin(other, partitioner)
483+
fromRDD(joinResult.mapValues{ case (v, w) =>
484+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
485+
})
486+
}
487+
472488
/**
473489
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
474490
* partitioner/parallelism level.
@@ -563,6 +579,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
563579
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
564580
}
565581

582+
/**
583+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
584+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
585+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
586+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
587+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
588+
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
589+
* parallelism level.
590+
*/
591+
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
592+
val joinResult = rdd.fullOuterJoin(other)
593+
fromRDD(joinResult.mapValues{ case (v, w) =>
594+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
595+
})
596+
}
597+
598+
/**
599+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
600+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
601+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
602+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
603+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
604+
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
605+
*/
606+
def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
607+
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
608+
val joinResult = rdd.fullOuterJoin(other, numPartitions)
609+
fromRDD(joinResult.mapValues{ case (v, w) =>
610+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
611+
})
612+
}
613+
566614
/**
567615
* Return the key-value pairs in this RDD to the master as a Map.
568616
*/

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ object SparkSubmit {
280280
}
281281

282282
// Read from default spark properties, if any
283-
for ((k, v) <- args.getDefaultSparkProperties) {
283+
for ((k, v) <- args.defaultSparkProperties) {
284284
sysProps.getOrElseUpdate(k, v)
285285
}
286286

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
5757
var pyFiles: String = null
5858
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
5959

60-
parseOpts(args.toList)
61-
mergeSparkProperties()
62-
checkRequiredArguments()
63-
64-
/** Return default present in the currently defined defaults file. */
65-
def getDefaultSparkProperties = {
60+
/** Default properties present in the currently defined defaults file. */
61+
lazy val defaultSparkProperties: HashMap[String, String] = {
6662
val defaultProperties = new HashMap[String, String]()
6763
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
6864
Option(propertiesFile).foreach { filename =>
@@ -79,6 +75,14 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
7975
defaultProperties
8076
}
8177

78+
// Respect SPARK_*_MEMORY for cluster mode
79+
driverMemory = sys.env.get("SPARK_DRIVER_MEMORY").orNull
80+
executorMemory = sys.env.get("SPARK_EXECUTOR_MEMORY").orNull
81+
82+
parseOpts(args.toList)
83+
mergeSparkProperties()
84+
checkRequiredArguments()
85+
8286
/**
8387
* Fill in any undefined values based on the default properties file or options passed in through
8488
* the '--conf' flag.
@@ -107,7 +111,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
107111
}
108112
}
109113

110-
val properties = getDefaultSparkProperties
114+
val properties = HashMap[String, String]()
115+
properties.putAll(defaultSparkProperties)
111116
properties.putAll(sparkProperties)
112117

113118
// Use properties file as fallback for values which have a direct analog to
@@ -213,7 +218,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
213218
| verbose $verbose
214219
|
215220
|Default properties from $propertiesFile:
216-
|${getDefaultSparkProperties.mkString(" ", "\n ", "\n")}
221+
|${defaultSparkProperties.mkString(" ", "\n ", "\n")}
217222
""".stripMargin
218223
}
219224

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.concurrent._
2424

2525
import scala.collection.JavaConversions._
2626
import scala.collection.mutable.{ArrayBuffer, HashMap}
27+
import scala.util.control.NonFatal
2728

2829
import org.apache.spark._
2930
import org.apache.spark.deploy.SparkHadoopUtil
@@ -375,12 +376,17 @@ private[spark] class Executor(
375376
}
376377

377378
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
378-
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
379-
retryAttempts, retryIntervalMs, timeout)
380-
if (response.reregisterBlockManager) {
381-
logWarning("Told to re-register on heartbeat")
382-
env.blockManager.reregister()
379+
try {
380+
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
381+
retryAttempts, retryIntervalMs, timeout)
382+
if (response.reregisterBlockManager) {
383+
logWarning("Told to re-register on heartbeat")
384+
env.blockManager.reregister()
385+
}
386+
} catch {
387+
case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
383388
}
389+
384390
Thread.sleep(interval)
385391
}
386392
}

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ class TaskMetrics extends Serializable {
137137
merged.localBlocksFetched += depMetrics.localBlocksFetched
138138
merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
139139
merged.remoteBytesRead += depMetrics.remoteBytesRead
140-
merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime)
141140
}
142141
_shuffleReadMetrics = Some(merged)
143142
}
@@ -177,11 +176,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
177176
*/
178177
@DeveloperApi
179178
class ShuffleReadMetrics extends Serializable {
180-
/**
181-
* Absolute time when this task finished reading shuffle data
182-
*/
183-
var shuffleFinishTime: Long = -1
184-
185179
/**
186180
* Number of blocks fetched in this shuffle by this task (remote or local)
187181
*/

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
506506
}
507507
}
508508

509+
/**
510+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
511+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
512+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
513+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
514+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
515+
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
516+
*/
517+
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
518+
: RDD[(K, (Option[V], Option[W]))] = {
519+
this.cogroup(other, partitioner).flatMapValues {
520+
case (vs, Seq()) => vs.map(v => (Some(v), None))
521+
case (Seq(), ws) => ws.map(w => (None, Some(w)))
522+
case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
523+
}
524+
}
525+
509526
/**
510527
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
511528
* existing partitioner/parallelism level.
@@ -585,6 +602,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
585602
rightOuterJoin(other, new HashPartitioner(numPartitions))
586603
}
587604

605+
/**
606+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
607+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
608+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
609+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
610+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
611+
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
612+
* parallelism level.
613+
*/
614+
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
615+
fullOuterJoin(other, defaultPartitioner(self, other))
616+
}
617+
618+
/**
619+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
620+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
621+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
622+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
623+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
624+
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
625+
*/
626+
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
627+
fullOuterJoin(other, new HashPartitioner(numPartitions))
628+
}
629+
588630
/**
589631
* Return the key-value pairs in this RDD to the master as a Map.
590632
*
@@ -872,7 +914,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
872914
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
873915
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
874916
}
875-
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
917+
918+
// Use configured output committer if already set
919+
if (conf.getOutputCommitter == null) {
920+
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
921+
}
922+
876923
FileOutputFormat.setOutputPath(hadoopConf,
877924
SparkHadoopWriter.createPathFromString(path, hadoopConf))
878925
saveAsHadoopDataset(hadoopConf)

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.util.Random
20+
import java.util.{Properties, Random}
2121

2222
import scala.collection.{mutable, Map}
2323
import scala.collection.mutable.ArrayBuffer
@@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator
4141
import org.apache.spark.partial.GroupedCountEvaluator
4242
import org.apache.spark.partial.PartialResult
4343
import org.apache.spark.storage.StorageLevel
44-
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
44+
import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
4545
import org.apache.spark.util.collection.OpenHashMap
4646
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
4747

@@ -1224,7 +1224,8 @@ abstract class RDD[T: ClassTag](
12241224
private var storageLevel: StorageLevel = StorageLevel.NONE
12251225

12261226
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
1227-
@transient private[spark] val creationSite = Utils.getCallSite
1227+
@transient private[spark] val creationSite = sc.getCallSite()
1228+
12281229
private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
12291230

12301231
private[spark] def elementClassTag: ClassTag[T] = classTag[T]

core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
171171
}
172172
val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
173173
case Some(metrics) =>
174-
" SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
175174
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
176175
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
177176
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,6 @@ private[spark] object JsonProtocol {
255255
}
256256

257257
def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
258-
("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
259258
("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
260259
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
261260
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
@@ -590,7 +589,6 @@ private[spark] object JsonProtocol {
590589

591590
def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
592591
val metrics = new ShuffleReadMetrics
593-
metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
594592
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
595593
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
596594
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]

0 commit comments

Comments
 (0)