Skip to content

[SPARK-6606][CORE]Accumulator deserialized twice because the NarrowCoGroupSplitDep contains rdd object. #5259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.shuffle.ShuffleHandle
private[spark] sealed trait CoGroupSplitDep extends Serializable

private[spark] case class NarrowCoGroupSplitDep(
rdd: RDD[_],
@ transient rdd: RDD[_],
splitIndex: Int,
var split: Partition
) extends CoGroupSplitDep {
Expand Down Expand Up @@ -67,7 +67,7 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
* @param part partitioner used to partition the shuffle output
*/
@DeveloperApi
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
class CoGroupedRDD[K](var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {

// For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs).
Expand Down Expand Up @@ -125,9 +125,9 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
case NarrowCoGroupSplitDep(_, _, itsSplit) =>
// Read them from the parent
val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]]
val it = rdds(depNum).iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]]
rddIterators += ((it, depNum))

case ShuffleCoGroupSplitDep(handle) =>
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ import org.apache.spark.serializer.Serializer
* out of memory because of the size of `rdd2`.
*/
private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
@transient var rdd1: RDD[_ <: Product2[K, V]],
@transient var rdd2: RDD[_ <: Product2[K, W]],
var rdd1: RDD[_ <: Product2[K, V]],
var rdd2: RDD[_ <: Product2[K, W]],
part: Partitioner)
extends RDD[(K, V)](rdd1.context, Nil) {

Expand Down Expand Up @@ -105,8 +105,8 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
seq
}
}
def integrate(dep: CoGroupSplitDep, op: Product2[K, V] => Unit): Unit = dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
def integrate(rdd: RDD[_], dep: CoGroupSplitDep, op: Product2[K, V] => Unit): Unit = dep match {
case NarrowCoGroupSplitDep(_, _, itsSplit) =>
rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, V]]].foreach(op)

case ShuffleCoGroupSplitDep(handle) =>
Expand All @@ -116,9 +116,9 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
iter.foreach(op)
}
// the first dep is rdd1; add all values to the map
integrate(partition.deps(0), t => getSeq(t._1) += t._2)
integrate(rdd1, partition.deps(0), t => getSeq(t._1) += t._2)
// the second dep is rdd2; remove all of its keys
integrate(partition.deps(1), t => map.remove(t._1))
integrate(rdd2, partition.deps(1), t => map.remove(t._1))
map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten
}

Expand Down