Skip to content

Commit 063ea0b

Browse files
committed
Merge branch 'master' into randomRDD
2 parents aec68eb + c3462c6 commit 063ea0b

File tree

185 files changed

+1276
-280
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

185 files changed

+1276
-280
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ conf/spark-env.sh
1919
conf/streaming-env.sh
2020
conf/log4j.properties
2121
conf/spark-defaults.conf
22+
conf/hive-site.xml
2223
docs/_site
2324
docs/api
2425
target/

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
134134
def getPartition(key: Any): Int = {
135135
val k = key.asInstanceOf[K]
136136
var partition = 0
137-
if (rangeBounds.length < 1000) {
138-
// If we have less than 100 partitions naive search
137+
if (rangeBounds.length <= 128) {
138+
// If we have less than 128 partitions naive search
139139
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
140140
partition += 1
141141
}

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ class TaskMetrics extends Serializable {
9999
existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
100100
existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
101101
existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
102-
existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched
103102
existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
104103
case None =>
105104
_shuffleReadMetrics = Some(newMetrics)
@@ -149,7 +148,7 @@ class ShuffleReadMetrics extends Serializable {
149148
/**
150149
* Number of blocks fetched in this shuffle by this task (remote or local)
151150
*/
152-
var totalBlocksFetched: Int = _
151+
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
153152

154153
/**
155154
* Number of remote blocks fetched in this shuffle by this task

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
170170

171171
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
172172
val newCombiner = Array.fill(numRdds)(new CoGroup)
173-
value match { case (v, depNum) => newCombiner(depNum) += v }
173+
newCombiner(value._2) += value._1
174174
newCombiner
175175
}
176176
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
177177
(combiner, value) => {
178-
value match { case (v, depNum) => combiner(depNum) += v }
178+
combiner(value._2) += value._1
179179
combiner
180180
}
181181
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =

core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
2828
override val partitioner = firstParent[Product2[K, U]].partitioner
2929

3030
override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
31-
firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
31+
firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) }
3232
}
3333
}

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
216216

217217
val reducePartition = (iter: Iterator[(K, V)]) => {
218218
val map = new JHashMap[K, V]
219-
iter.foreach { case (k, v) =>
220-
val old = map.get(k)
221-
map.put(k, if (old == null) v else func(old, v))
219+
iter.foreach { pair =>
220+
val old = map.get(pair._1)
221+
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
222222
}
223223
Iterator(map)
224224
} : Iterator[JHashMap[K, V]]
225225

226226
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
227-
m2.foreach { case (k, v) =>
228-
val old = m1.get(k)
229-
m1.put(k, if (old == null) v else func(old, v))
227+
m2.foreach { pair =>
228+
val old = m1.get(pair._1)
229+
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
230230
}
231231
m1
232232
} : JHashMap[K, V]
@@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
401401
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
402402
*/
403403
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
404-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
405-
for (v <- vs; w <- ws) yield (v, w)
406-
}
404+
this.cogroup(other, partitioner).flatMapValues( pair =>
405+
for (v <- pair._1; w <- pair._2) yield (v, w)
406+
)
407407
}
408408

409409
/**
@@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
413413
* partition the output RDD.
414414
*/
415415
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
416-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
417-
if (ws.isEmpty) {
418-
vs.map(v => (v, None))
416+
this.cogroup(other, partitioner).flatMapValues { pair =>
417+
if (pair._2.isEmpty) {
418+
pair._1.map(v => (v, None))
419419
} else {
420-
for (v <- vs; w <- ws) yield (v, Some(w))
420+
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
421421
}
422422
}
423423
}
@@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
430430
*/
431431
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
432432
: RDD[(K, (Option[V], W))] = {
433-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
434-
if (vs.isEmpty) {
435-
ws.map(w => (None, w))
433+
this.cogroup(other, partitioner).flatMapValues { pair =>
434+
if (pair._1.isEmpty) {
435+
pair._2.map(w => (None, w))
436436
} else {
437-
for (v <- vs; w <- ws) yield (Some(v), w)
437+
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
438438
}
439439
}
440440
}
@@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
535535
val data = self.collect()
536536
val map = new mutable.HashMap[K, V]
537537
map.sizeHint(data.length)
538-
data.foreach { case (k, v) => map.put(k, v) }
538+
data.foreach { pair => map.put(pair._1, pair._2) }
539539
map
540540
}
541541

@@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
572572
}
573573
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
574574
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
575-
(vs.asInstanceOf[Seq[V]],
576-
w1s.asInstanceOf[Seq[W1]],
577-
w2s.asInstanceOf[Seq[W2]],
578-
w3s.asInstanceOf[Seq[W3]])
575+
(vs.asInstanceOf[Seq[V]],
576+
w1s.asInstanceOf[Seq[W1]],
577+
w2s.asInstanceOf[Seq[W2]],
578+
w3s.asInstanceOf[Seq[W3]])
579579
}
580580
}
581581

@@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
589589
throw new SparkException("Default partitioner cannot partition array keys.")
590590
}
591591
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
592-
cg.mapValues { case Seq(vs, ws) =>
593-
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
592+
cg.mapValues { case Seq(vs, w1s) =>
593+
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
594594
}
595595
}
596596

@@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
606606
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
607607
cg.mapValues { case Seq(vs, w1s, w2s) =>
608608
(vs.asInstanceOf[Seq[V]],
609-
w1s.asInstanceOf[Seq[W1]],
610-
w2s.asInstanceOf[Seq[W2]])
609+
w1s.asInstanceOf[Seq[W1]],
610+
w2s.asInstanceOf[Seq[W2]])
611611
}
612612
}
613613

@@ -712,8 +712,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
712712
val index = p.getPartition(key)
713713
val process = (it: Iterator[(K, V)]) => {
714714
val buf = new ArrayBuffer[V]
715-
for ((k, v) <- it if k == key) {
716-
buf += v
715+
for (pair <- it if pair._1 == key) {
716+
buf += pair._2
717717
}
718718
buf
719719
} : Seq[V]
@@ -858,8 +858,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
858858
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
859859
try {
860860
while (iter.hasNext) {
861-
val (k, v) = iter.next()
862-
writer.write(k, v)
861+
val pair = iter.next()
862+
writer.write(pair._1, pair._2)
863863
}
864864
} finally {
865865
writer.close(hadoopContext)

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,11 +1269,55 @@ abstract class RDD[T: ClassTag](
12691269

12701270
/** A description of this RDD and its recursive dependencies for debugging. */
12711271
def toDebugString: String = {
1272-
def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
1273-
Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++
1274-
rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " "))
1272+
// Apply a different rule to the last child
1273+
def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = {
1274+
val len = rdd.dependencies.length
1275+
len match {
1276+
case 0 => Seq.empty
1277+
case 1 =>
1278+
val d = rdd.dependencies.head
1279+
debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]], true)
1280+
case _ =>
1281+
val frontDeps = rdd.dependencies.take(len - 1)
1282+
val frontDepStrings = frontDeps.flatMap(
1283+
d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]]))
1284+
1285+
val lastDep = rdd.dependencies.last
1286+
val lastDepStrings =
1287+
debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_,_,_]], true)
1288+
1289+
(frontDepStrings ++ lastDepStrings)
1290+
}
1291+
}
1292+
// The first RDD in the dependency stack has no parents, so no need for a +-
1293+
def firstDebugString(rdd: RDD[_]): Seq[String] = {
1294+
val partitionStr = "(" + rdd.partitions.size + ")"
1295+
val leftOffset = (partitionStr.length - 1) / 2
1296+
val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))
1297+
Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
1298+
}
1299+
def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
1300+
val partitionStr = "(" + rdd.partitions.size + ")"
1301+
val leftOffset = (partitionStr.length - 1) / 2
1302+
val thisPrefix = prefix.replaceAll("\\|\\s+$", "")
1303+
val nextPrefix = (
1304+
thisPrefix
1305+
+ (if (isLastChild) " " else "| ")
1306+
+ (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)))
1307+
Seq(thisPrefix + "+-" + partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
1308+
}
1309+
def debugString(rdd: RDD[_],
1310+
prefix: String = "",
1311+
isShuffle: Boolean = true,
1312+
isLastChild: Boolean = false): Seq[String] = {
1313+
if (isShuffle) {
1314+
shuffleDebugString(rdd, prefix, isLastChild)
1315+
}
1316+
else {
1317+
Seq(prefix + rdd) ++ debugChildren(rdd, prefix)
1318+
}
12751319
}
1276-
debugString(this).mkString("\n")
1320+
firstDebugString(this).mkString("\n")
12771321
}
12781322

12791323
override def toString: String = "%s%s[%d] at %s".format(

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
8181
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
8282
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
8383
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
84-
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
8584
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
8685
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
8786
context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)

core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ import org.apache.spark.util.Utils
4646
private[storage]
4747
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
4848
def initialize()
49-
def totalBlocks: Int
5049
def numLocalBlocks: Int
5150
def numRemoteBlocks: Int
5251
def fetchWaitTime: Long
@@ -180,9 +179,9 @@ object BlockFetcherIterator {
180179
if (curRequestSize >= targetRequestSize) {
181180
// Add this FetchRequest
182181
remoteRequests += new FetchRequest(address, curBlocks)
183-
curRequestSize = 0
184182
curBlocks = new ArrayBuffer[(BlockId, Long)]
185183
logDebug(s"Creating fetch request of $curRequestSize at $address")
184+
curRequestSize = 0
186185
}
187186
}
188187
// Add in the final request
@@ -192,7 +191,7 @@ object BlockFetcherIterator {
192191
}
193192
}
194193
logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
195-
totalBlocks + " blocks")
194+
(numLocal + numRemote) + " blocks")
196195
remoteRequests
197196
}
198197

@@ -235,7 +234,6 @@ object BlockFetcherIterator {
235234
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
236235
}
237236

238-
override def totalBlocks: Int = numLocal + numRemote
239237
override def numLocalBlocks: Int = numLocal
240238
override def numRemoteBlocks: Int = numRemote
241239
override def fetchWaitTime: Long = _fetchWaitTime

core/src/main/scala/org/apache/spark/ui/ToolTips.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package org.apache.spark.ui
2020
private[spark] object ToolTips {
2121
val SCHEDULER_DELAY =
2222
"""Scheduler delay includes time to ship the task from the scheduler to
23-
the executor, and time the time to send a message from the executor to the scheduler stating
24-
that the task has completed. When the scheduler becomes overloaded, task completion messages
25-
become queued up, and scheduler delay increases."""
23+
the executor, and time to send the task result from the executor to the scheduler. If
24+
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
25+
of task results."""
2626

2727
val INPUT = "Bytes read from Hadoop or from Spark storage."
2828

0 commit comments

Comments
 (0)