@@ -22,6 +22,7 @@ import java.util.Random
2222import scala .collection .Map
2323import scala .collection .JavaConversions .mapAsScalaMap
2424import scala .collection .mutable .ArrayBuffer
25+ import scala .collection .mutable .HashSet
2526import scala .reflect .{classTag , ClassTag }
2627
2728import com .clearspring .analytics .stream .cardinality .HyperLogLog
@@ -221,12 +222,22 @@ abstract class RDD[T: ClassTag](
221222 }
222223 }
223224
225+ private val previouslyComputed = new HashSet [Partition ]
224226 /**
225227 * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
226228 */
227229 private [spark] def computeOrReadCheckpoint (split : Partition , context : TaskContext ): Iterator [T ] =
228230 {
229- if (isCheckpointed) firstParent[T ].iterator(split, context) else compute(split, context)
231+ if (isCheckpointed) {
232+ firstParent[T ].iterator(split, context)
233+ } else {
234+ if (previouslyComputed.contains(split)) {
235+ logWarning(" Recomputing RDD %d, partition %d" .format(id, split.index))
236+ } else {
237+ previouslyComputed.add(split)
238+ }
239+ compute(split, context)
240+ }
230241 }
231242
232243 // Transformations (return a new RDD)
@@ -1045,6 +1056,8 @@ abstract class RDD[T: ClassTag](
10451056
10461057 private [spark] def elementClassTag : ClassTag [T ] = classTag[T ]
10471058
1059+ private [spark] val computeSites = new ArrayBuffer [String ]
1060+
10481061 private [spark] var checkpointData : Option [RDDCheckpointData [T ]] = None
10491062
10501063 /** Returns the first parent RDD */
0 commit comments