Skip to content

Commit 0182f2b

Browse files
committed
Do not re-use objects in the EdgePartition/EdgeTriplet iterators. This avoids a silent data corruption issue (SPARK-1188) and has no performance impact in my measurements. It also simplifies the code.
1 parent c55f52f commit 0182f2b

File tree

2 files changed

+9
-36
lines changed

2 files changed

+9
-36
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
6262
* applied to each edge
6363
*/
6464
def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = {
65-
val newData = new Array[ED2](data.size)
66-
val edge = new Edge[ED]()
67-
val size = data.size
68-
var i = 0
69-
while (i < size) {
70-
edge.srcId = srcIds(i)
71-
edge.dstId = dstIds(i)
72-
edge.attr = data(i)
73-
newData(i) = f(edge)
74-
i += 1
75-
}
76-
new EdgePartition(srcIds, dstIds, newData, index)
65+
val newData = (0 until data.size).map(i => f(Edge(srcIds(i), dstIds(i), data(i))))
66+
new EdgePartition(srcIds, dstIds, newData.toArray, index)
7767
}
7868

7969
/**
@@ -84,19 +74,13 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
8474
* order of the edges returned by `EdgePartition.iterator` and
8575
* should return attributes equal to the number of edges.
8676
*
87-
* @param f a function from an edge to a new attribute
77+
* @param iter an iterator for the new attribute values
8878
* @tparam ED2 the type of the new attribute
89-
* @return a new edge partition with the result of the function `f`
90-
* applied to each edge
79+
* @return a new edge partition with the attribute values replaced
9180
*/
9281
def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
93-
val newData = new Array[ED2](data.size)
94-
var i = 0
95-
while (iter.hasNext) {
96-
newData(i) = iter.next()
97-
i += 1
98-
}
99-
assert(newData.size == i)
82+
val newData = iter.toArray
83+
assert(newData.size == data.size)
10084
new EdgePartition(srcIds, dstIds, newData, index)
10185
}
10286

@@ -191,15 +175,12 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
191175
* @return an iterator over edges in the partition
192176
*/
193177
def iterator = new Iterator[Edge[ED]] {
194-
private[this] val edge = new Edge[ED]
195178
private[this] var pos = 0
196179

197180
override def hasNext: Boolean = pos < EdgePartition.this.size
198181

199182
override def next(): Edge[ED] = {
200-
edge.srcId = srcIds(pos)
201-
edge.dstId = dstIds(pos)
202-
edge.attr = data(pos)
183+
val edge = Edge(srcIds(pos), dstIds(pos), data(pos))
203184
pos += 1
204185
edge
205186
}
@@ -218,7 +199,6 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
218199
* cluster must start at position `index`.
219200
*/
220201
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
221-
private[this] val edge = new Edge[ED]
222202
private[this] var pos = index
223203

224204
override def hasNext: Boolean = {
@@ -227,9 +207,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
227207

228208
override def next(): Edge[ED] = {
229209
assert(srcIds(pos) == srcId)
230-
edge.srcId = srcIds(pos)
231-
edge.dstId = dstIds(pos)
232-
edge.attr = data(pos)
210+
val edge = Edge(srcIds(pos), dstIds(pos), data(pos))
233211
pos += 1
234212
edge
235213
}

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,15 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
3737
// Current position in the array.
3838
private var pos = 0
3939

40-
// A triplet object that this iterator.next() call returns. We reuse this object to avoid
41-
// allocating too many temporary Java objects.
42-
private val triplet = new EdgeTriplet[VD, ED]
43-
4440
private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
4541

4642
override def hasNext: Boolean = pos < edgePartition.size
4743

4844
override def next() = {
45+
val triplet = new EdgeTriplet[VD, ED]
4946
triplet.srcId = edgePartition.srcIds(pos)
50-
// assert(vmap.containsKey(e.src.id))
5147
triplet.srcAttr = vmap(triplet.srcId)
5248
triplet.dstId = edgePartition.dstIds(pos)
53-
// assert(vmap.containsKey(e.dst.id))
5449
triplet.dstAttr = vmap(triplet.dstId)
5550
triplet.attr = edgePartition.data(pos)
5651
pos += 1

0 commit comments

Comments
 (0)