Skip to content

Do not re-use objects in the EdgePartition/EdgeTriplet iterators. #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
p.next._2.iterator.map(_.copy())
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* Construct a new edge partition by applying the function f to all
* edges in this partition.
*
* Be careful not to keep references to the objects passed to `f`.
* To improve GC performance the same object is re-used for each call.
*
* @param f a function from an edge to a new attribute
* @tparam ED2 the type of the new attribute
* @return a new edge partition with the result of the function `f`
Expand Down Expand Up @@ -84,12 +87,12 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* order of the edges returned by `EdgePartition.iterator` and
* should return attributes equal to the number of edges.
*
* @param f a function from an edge to a new attribute
* @param iter an iterator for the new attribute values
* @tparam ED2 the type of the new attribute
* @return a new edge partition with the result of the function `f`
* applied to each edge
* @return a new edge partition with the attribute values replaced
*/
def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
// Faster than iter.toArray, because the expected size is known.
val newData = new Array[ED2](data.size)
var i = 0
while (iter.hasNext) {
Expand Down Expand Up @@ -188,6 +191,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
/**
* Get an iterator over the edges in this partition.
*
* Be careful not to keep references to the objects from this iterator.
* To improve GC performance the same object is re-used in `next()`.
*
* @return an iterator over edges in the partition
*/
def iterator = new Iterator[Edge[ED]] {
Expand Down Expand Up @@ -216,6 +222,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
/**
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
* cluster must start at position `index`.
*
* Be careful not to keep references to the objects from this iterator. To improve GC performance
* the same object is re-used in `next()`.
*/
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,15 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
// Current position in the array.
private var pos = 0

// A triplet object that this iterator.next() call returns. We reuse this object to avoid
// allocating too many temporary Java objects.
private val triplet = new EdgeTriplet[VD, ED]

private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)

override def hasNext: Boolean = pos < edgePartition.size

override def next() = {
val triplet = new EdgeTriplet[VD, ED]
triplet.srcId = edgePartition.srcIds(pos)
// assert(vmap.containsKey(e.src.id))
triplet.srcAttr = vmap(triplet.srcId)
triplet.dstId = edgePartition.dstIds(pos)
// assert(vmap.containsKey(e.dst.id))
triplet.dstAttr = vmap(triplet.dstId)
triplet.attr = edgePartition.data(pos)
pos += 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.impl

import scala.reflect.ClassTag
import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.graphx._

class EdgeTripletIteratorSuite extends FunSuite {
test("iterator.toList") {
val builder = new EdgePartitionBuilder[Int]
builder.add(1, 2, 0)
builder.add(1, 3, 0)
builder.add(1, 4, 0)
val vidmap = new VertexIdToIndexMap
vidmap.add(1)
vidmap.add(2)
vidmap.add(3)
vidmap.add(4)
val vs = Array.fill(vidmap.capacity)(0)
val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition)
val result = iter.toList.map(et => (et.srcId, et.dstId))
assert(result === Seq((1, 2), (1, 3), (1, 4)))
}
}