Skip to content

Commit 4955697

Browse files
committed
Create a copy of the Edge objects in EdgeRDD.compute(). This avoids exposing the object re-use, while still enables the more efficient behavior for internal code.
1 parent 4ec77f8 commit 4955697

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class EdgeRDD[@specialized ED: ClassTag](
4545
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
4646

4747
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
48-
firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
48+
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
49+
p.next._2.iterator.map(_.copy())
4950
}
5051

5152
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

0 commit comments

Comments
 (0)