Skip to content

Commit 717c9c3

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
2 parents 6635467 + 98ab411 commit 717c9c3

15 files changed

+448
-174
lines changed

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ import org.apache.spark.shuffle.ShuffleHandle
2727
* Base class for dependencies.
2828
*/
2929
@DeveloperApi
30-
abstract class Dependency[T] extends Serializable {
31-
def rdd: RDD[T]
32-
}
30+
abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
3331

3432

3533
/**
@@ -38,47 +36,41 @@ abstract class Dependency[T] extends Serializable {
3836
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
3937
*/
4038
@DeveloperApi
41-
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
39+
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
4240
/**
4341
* Get the parent partitions for a child partition.
4442
* @param partitionId a partition of the child RDD
4543
* @return the partitions of the parent RDD that the child partition depends upon
4644
*/
4745
def getParents(partitionId: Int): Seq[Int]
48-
49-
override def rdd: RDD[T] = _rdd
5046
}
5147

5248

5349
/**
5450
* :: DeveloperApi ::
55-
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
56-
* the RDD is transient since we don't need it on the executor side.
57-
*
58-
* @param _rdd the parent RDD
51+
* Represents a dependency on the output of a shuffle stage.
52+
* @param rdd the parent RDD
5953
* @param partitioner partitioner used to partition the shuffle output
6054
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
6155
* the default serializer, as specified by `spark.serializer` config option, will
6256
* be used.
6357
*/
6458
@DeveloperApi
6559
class ShuffleDependency[K, V, C](
66-
@transient _rdd: RDD[_ <: Product2[K, V]],
60+
@transient rdd: RDD[_ <: Product2[K, V]],
6761
val partitioner: Partitioner,
6862
val serializer: Option[Serializer] = None,
6963
val keyOrdering: Option[Ordering[K]] = None,
7064
val aggregator: Option[Aggregator[K, V, C]] = None,
7165
val mapSideCombine: Boolean = false)
72-
extends Dependency[Product2[K, V]] {
73-
74-
override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
66+
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
7567

76-
val shuffleId: Int = _rdd.context.newShuffleId()
68+
val shuffleId: Int = rdd.context.newShuffleId()
7769

78-
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
79-
shuffleId, _rdd.partitions.size, this)
70+
val shuffleHandle: ShuffleHandle = rdd.context.env.shuffleManager.registerShuffle(
71+
shuffleId, rdd.partitions.size, this)
8072

81-
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
73+
rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
8274
}
8375

8476

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -997,6 +997,8 @@ class SparkContext(config: SparkConf) extends Logging {
997997
// TODO: Cache.stop()?
998998
env.stop()
999999
SparkEnv.set(null)
1000+
ShuffleMapTask.clearCache()
1001+
ResultTask.clearCache()
10001002
listenerBus.stop()
10011003
eventLogger.foreach(_.stop())
10021004
logInfo("Successfully stopped SparkContext")

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
170170

171171
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
172172
val newCombiner = Array.fill(numRdds)(new CoGroup)
173-
value match { case (v, depNum) => newCombiner(depNum) += v }
173+
newCombiner(value._2) += value._1
174174
newCombiner
175175
}
176176
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
177177
(combiner, value) => {
178-
value match { case (v, depNum) => combiner(depNum) += v }
178+
combiner(value._2) += value._1
179179
combiner
180180
}
181181
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =

core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
2828
override val partitioner = firstParent[Product2[K, U]].partitioner
2929

3030
override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
31-
firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
31+
firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) }
3232
}
3333
}

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
216216

217217
val reducePartition = (iter: Iterator[(K, V)]) => {
218218
val map = new JHashMap[K, V]
219-
iter.foreach { case (k, v) =>
220-
val old = map.get(k)
221-
map.put(k, if (old == null) v else func(old, v))
219+
iter.foreach { pair =>
220+
val old = map.get(pair._1)
221+
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
222222
}
223223
Iterator(map)
224224
} : Iterator[JHashMap[K, V]]
225225

226226
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
227-
m2.foreach { case (k, v) =>
228-
val old = m1.get(k)
229-
m1.put(k, if (old == null) v else func(old, v))
227+
m2.foreach { pair =>
228+
val old = m1.get(pair._1)
229+
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
230230
}
231231
m1
232232
} : JHashMap[K, V]
@@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
401401
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
402402
*/
403403
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
404-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
405-
for (v <- vs; w <- ws) yield (v, w)
406-
}
404+
this.cogroup(other, partitioner).flatMapValues( pair =>
405+
for (v <- pair._1; w <- pair._2) yield (v, w)
406+
)
407407
}
408408

409409
/**
@@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
413413
* partition the output RDD.
414414
*/
415415
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
416-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
417-
if (ws.isEmpty) {
418-
vs.map(v => (v, None))
416+
this.cogroup(other, partitioner).flatMapValues { pair =>
417+
if (pair._2.isEmpty) {
418+
pair._1.map(v => (v, None))
419419
} else {
420-
for (v <- vs; w <- ws) yield (v, Some(w))
420+
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
421421
}
422422
}
423423
}
@@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
430430
*/
431431
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
432432
: RDD[(K, (Option[V], W))] = {
433-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
434-
if (vs.isEmpty) {
435-
ws.map(w => (None, w))
433+
this.cogroup(other, partitioner).flatMapValues { pair =>
434+
if (pair._1.isEmpty) {
435+
pair._2.map(w => (None, w))
436436
} else {
437-
for (v <- vs; w <- ws) yield (Some(v), w)
437+
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
438438
}
439439
}
440440
}
@@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
535535
val data = self.collect()
536536
val map = new mutable.HashMap[K, V]
537537
map.sizeHint(data.length)
538-
data.foreach { case (k, v) => map.put(k, v) }
538+
data.foreach { pair => map.put(pair._1, pair._2) }
539539
map
540540
}
541541

@@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
572572
}
573573
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
574574
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
575-
(vs.asInstanceOf[Seq[V]],
576-
w1s.asInstanceOf[Seq[W1]],
577-
w2s.asInstanceOf[Seq[W2]],
578-
w3s.asInstanceOf[Seq[W3]])
575+
(vs.asInstanceOf[Seq[V]],
576+
w1s.asInstanceOf[Seq[W1]],
577+
w2s.asInstanceOf[Seq[W2]],
578+
w3s.asInstanceOf[Seq[W3]])
579579
}
580580
}
581581

@@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
589589
throw new SparkException("Default partitioner cannot partition array keys.")
590590
}
591591
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
592-
cg.mapValues { case Seq(vs, ws) =>
593-
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
592+
cg.mapValues { case Seq(vs, w1s) =>
593+
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
594594
}
595595
}
596596

@@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
606606
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
607607
cg.mapValues { case Seq(vs, w1s, w2s) =>
608608
(vs.asInstanceOf[Seq[V]],
609-
w1s.asInstanceOf[Seq[W1]],
610-
w2s.asInstanceOf[Seq[W2]])
609+
w1s.asInstanceOf[Seq[W1]],
610+
w2s.asInstanceOf[Seq[W2]])
611611
}
612612
}
613613

@@ -712,8 +712,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
712712
val index = p.getPartition(key)
713713
val process = (it: Iterator[(K, V)]) => {
714714
val buf = new ArrayBuffer[V]
715-
for ((k, v) <- it if k == key) {
716-
buf += v
715+
for (pair <- it if pair._1 == key) {
716+
buf += pair._2
717717
}
718718
buf
719719
} : Seq[V]
@@ -858,8 +858,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
858858
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
859859
try {
860860
while (iter.hasNext) {
861-
val (k, v) = iter.next()
862-
writer.write(k, v)
861+
val pair = iter.next()
862+
writer.write(pair._1, pair._2)
863863
}
864864
} finally {
865865
writer.close(hadoopContext)

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,12 @@ import org.apache.spark.Partitioner._
3535
import org.apache.spark.SparkContext._
3636
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3737
import org.apache.spark.api.java.JavaRDD
38-
import org.apache.spark.broadcast.Broadcast
3938
import org.apache.spark.partial.BoundedDouble
4039
import org.apache.spark.partial.CountEvaluator
4140
import org.apache.spark.partial.GroupedCountEvaluator
4241
import org.apache.spark.partial.PartialResult
4342
import org.apache.spark.storage.StorageLevel
44-
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
43+
import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
4544
import org.apache.spark.util.collection.OpenHashMap
4645
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
4746

@@ -1196,36 +1195,21 @@ abstract class RDD[T: ClassTag](
11961195
/**
11971196
* Return whether this RDD has been checkpointed or not
11981197
*/
1199-
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
1198+
def isCheckpointed: Boolean = {
1199+
checkpointData.map(_.isCheckpointed).getOrElse(false)
1200+
}
12001201

12011202
/**
12021203
* Gets the name of the file to which this RDD was checkpointed
12031204
*/
1204-
def getCheckpointFile: Option[String] = checkpointData.flatMap(_.getCheckpointFile)
1205+
def getCheckpointFile: Option[String] = {
1206+
checkpointData.flatMap(_.getCheckpointFile)
1207+
}
12051208

12061209
// =======================================================================
12071210
// Other internal methods and fields
12081211
// =======================================================================
12091212

1210-
/**
1211-
* Broadcasted copy of this RDD, used to dispatch tasks to executors. Note that we broadcast
1212-
* the serialized copy of the RDD and for each task we will deserialize it, which means each
1213-
* task gets a different copy of the RDD. This provides stronger isolation between tasks that
1214-
* might modify state of objects referenced in their closures. This is necessary in Hadoop
1215-
* where the JobConf/Configuration object is not thread-safe.
1216-
*/
1217-
@transient private[spark] lazy val broadcasted: Broadcast[Array[Byte]] = {
1218-
val ser = SparkEnv.get.closureSerializer.newInstance()
1219-
val bytes = ser.serialize(this).array()
1220-
val size = Utils.bytesToString(bytes.length)
1221-
if (bytes.length > (1L << 20)) {
1222-
logWarning(s"Broadcasting RDD $id ($size), which contains large objects")
1223-
} else {
1224-
logDebug(s"Broadcasting RDD $id ($size)")
1225-
}
1226-
sc.broadcast(bytes)
1227-
}
1228-
12291213
private var storageLevel: StorageLevel = StorageLevel.NONE
12301214

12311215
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */

core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
106106
cpRDD = Some(newRDD)
107107
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
108108
cpState = Checkpointed
109+
RDDCheckpointData.clearTaskCaches()
109110
}
110111
logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
111112
}
@@ -130,5 +131,9 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
130131
}
131132
}
132133

133-
// Used for synchronization
134-
private[spark] object RDDCheckpointData
134+
private[spark] object RDDCheckpointData {
135+
def clearTaskCaches() {
136+
ShuffleMapTask.clearCache()
137+
ResultTask.clearCache()
138+
}
139+
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,9 @@ class DAGScheduler(
376376
stageIdToStage -= stageId
377377
stageIdToJobIds -= stageId
378378

379+
ShuffleMapTask.removeStage(stageId)
380+
ResultTask.removeStage(stageId)
381+
379382
logDebug("After removal of stage %d, remaining stages = %d"
380383
.format(stageId, stageIdToStage.size))
381384
}
@@ -720,6 +723,7 @@ class DAGScheduler(
720723
}
721724
}
722725

726+
723727
/** Called when stage's parents are available and we can now do its task. */
724728
private def submitMissingTasks(stage: Stage, jobId: Int) {
725729
logDebug("submitMissingTasks(" + stage + ")")

0 commit comments

Comments
 (0)